import chromadb from langchain.text_splitter import CharacterTextSplitter import hashlib import time import json import fileCon import redis #chroma_client = chromadb.Client() chroma_client = chromadb.PersistentClient(path=r"E:\Code\saw-ai\chroma") # 使用持久化存储 collection = chroma_client.get_or_create_collection("test_collection") r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) def add_text_to_chroma(text, metadata=None): """将文本添加到 ChromaDB""" try: # 分割文本 text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20) documents = text_splitter.split_text(text) print("documents_:", len(documents[0])) # 生成唯一的 ID ids = [] timestamp = str(int(time.time() * 1000)) for i, doc in enumerate(documents): unique_string = doc + timestamp + str(i) id_ = hashlib.sha256(unique_string.encode()).hexdigest() ids.append(id_) print("ids:", len(ids[0])) # 使用 ChromaDB 的 add 方法将文本和元数据添加到集合中 collection.add( documents=documents, metadatas=[metadata] * len(documents) if metadata else [None] * len(documents), ids=ids ) print("文本已成功添加到 ChromaDB") print("集合中的文档数量:", collection.count()) except Exception as e: print(f"添加文本到 ChromaDB 时发生错误: {e}") def query_chroma(query_text, n_results=5): """查询 ChromaDB""" try: results = collection.query( query_texts=[query_text], n_results=n_results ) return results except Exception as e: print(f"查询 ChromaDB 时发生错误: {e}") return None def testChroma(): """测试 ChromaDB""" # 添加文本到 ChromaDB text = "这是一个测试文本。" metadata = {"source": "test.txt"} add_text_to_chroma(text, metadata) # 查询 ChromaDB query_text = "测试" results = query_chroma(query_text) print("查询结果:", json.dumps(results, ensure_ascii=False, indent=4)) if results: print("查询结果:", results) def KBaseTextAdd(kbase,userID): fileIDs = json.loads(kbase["FileIDs"]) # print("fileIDs:", fileIDs) # print("fileIDs type:", type(fileIDs)) collection_name = kbase["UUID"] print("collection_name:", collection_name) kcollection = chroma_client.get_or_create_collection(collection_name) for userFileID in fileIDs: #查找用户文件信息 userFile = fileCon.get_user_file_info(userID, userFileID["file_id"]) if userFile == None: print("获取文件信息失败,userfileID:", userFileID["file_id"]) continue #print("userFile:", userFile) # file_id = userFile["FileID"] file_id = userFileID["file_id"] #实际文件id data = fileCon.get_file_content(file_id) #获取文件内容 if data == None: print("获取文件内容失败,file_id:", file_id) continue text = data[0]["FileContent"] print("text:", text[:100]) print("text length:", len(text), "file_id:", file_id) # 分割文本 text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".") documents = text_splitter.split_text(text) print("split documents:", len(documents[0]), " length:", len(documents)) ids = [] for i, doc in enumerate(documents): unique_string = doc + str(i) id_ = hashlib.sha256(unique_string.encode()).hexdigest() ids.append(id_) #获取文件内容 metadata = [] for i in range(len(documents)): metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} ) print("metadata:", metadata[0], " length:", len(metadata)) print("documents:", len(documents[0]), " length:", len(documents)) print("ids:", len(ids[0]), " length:", len(ids)) # Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name) kcollection.add( documents=documents, metadatas=metadata, ids=ids ) # Chroma.persist() print("文本已成功添加到 ChromaDB") def KBaseTextAddByUserFileID(collection_name,userID,file_id): kcollection = chroma_client.get_or_create_collection(collection_name) #查找用户文件信息 userFile = fileCon.get_user_file_info(userID, file_id) if userFile == None: print("获取文件信息失败,userfileID:", file_id) return None data = fileCon.get_file_content(file_id) #获取文件内容 if data == None: print("获取文件内容失败,file_id:", file_id) return None text = data[0]["FileContent"] print("text:", text[:100]) print("text length:", len(text), "file_id:", file_id) # 分割文本 text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".") documents = text_splitter.split_text(text) print("split documents:", len(documents[0]), " length:", len(documents)) ids = [] for i, doc in enumerate(documents): unique_string = doc + str(i) id_ = hashlib.sha256(unique_string.encode()).hexdigest() ids.append(id_) #获取文件内容 metadata = [] for i in range(len(documents)): metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} ) print("metadata:", metadata[0], " length:", len(metadata)) print("documents:", len(documents[0]), " length:", len(documents)) print("ids:", len(ids[0]), " length:", len(ids)) # Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name) kcollection.add( documents=documents, metadatas=metadata, ids=ids ) # Chroma.persist() #redis中添加文件ID集合 key = "kbase_py_"+collection_name+"_file_id_set" r.sadd(key, file_id) print("文本已成功添加到 ChromaDB,文件ID集合已成功添加到Redis:",key) return "success" def KBaseQuery(kbase, query_text, n_results=3): """查询 ChromaDB""" try: collection_name = kbase["UUID"] print("collection_name:", collection_name) kcollection = chroma_client.get_or_create_collection(collection_name) results = kcollection.query( query_texts=[query_text], n_results=n_results ) return results except Exception as e: print(f"查询 ChromaDB 时发生错误: {e}") return None def readKBaseRedisFileIDSet(uuid): """读取Redis中的文件ID集合""" try: # 连接到Redis数据库 r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) # 获取文件ID集合 key = "kbase_py_"+uuid+"_file_id_set" print("read key:", key) file_id_set = r.smembers(key) return file_id_set except Exception as e: print(f"读取Redis文件ID集合时发生错误: {e}") return None def writeKBaseRedisFileIDSet(uuid, file_id_set): """将文件ID集合写入Redis""" try: # 连接到Redis数据库 r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) # 将文件ID集合写入Redis key = "kbase_py_"+uuid+"_file_id_set" print("write key:", key) r.sadd("kbase_py_"+uuid, *file_id_set) print("文件ID集合已成功写入Redis") except Exception as e: print(f"写入Redis文件ID集合时发生错误: {e}") return "error" return None def deleteCollectionFileByUserFileID(collection_name, file_id): """删除指定文件ID的集合""" try: kcollection = chroma_client.get_or_create_collection(collection_name) kcollection.delete(where={"user_file_id": file_id}) ## 删除Redis中的文件ID集合 r.srem(collection_name, file_id) print("文件ID集合已成功删除") return "success" except Exception as e: print(f"删除文件ID集合时发生错误: {e}") return None def KBaseChroma(data): kbase = data["knowledge_base"] userID = data["im_context"]["user_id"] query_text = data["im_context"]["question"] fileIDs = json.loads(kbase["FileIDs"]) #判断文件id与保存的文件id是否一致,如果一致则不需要重新添加,需要增加和删除 fileIDSet = readKBaseRedisFileIDSet(kbase["UUID"]) print("fileIDSet:", fileIDSet) s2 = set() for fileID in fileIDs: s2.add(str(fileID["file_id"])) if str(fileID["file_id"]) in fileIDSet: print("文件ID已存在:", fileID["file_id"]) continue if str(fileID["file_id"]) not in fileIDSet: #添加文件 KBaseTextAddByUserFileID(kbase["UUID"], userID, fileID["file_id"]) break #删除的文件id:在s2中不存在,在fileIDSet中存在 sdel = fileIDSet - s2 print("sdel:", sdel) for fileID in sdel: #删除文件 deleteCollectionFileByUserFileID(kbase["UUID"], fileID) results = KBaseQuery(kbase, data["im_context"]["question"], 3) return results # if __name__ == "__main__": # s = r'{"im_context":{"user_id":1,"session_id":277,"function_id":6,"model_id":2,"model_type":"ollama","question":"test","channel":"user_1_ai_chat_msg_95aa6472-aebd-424f-bdae-954bb537b813","is_has_image":false},"knowledge_base":{"ID":3,"Name":"test2","Description":"test2","UUID":"0ba47d27-767b-40d4-91e6-cb1adfdcfcff","SessionID":277,"CreatedAt":"","UpdatedAt":"","DeletedAt":"","AuthID":0,"FileNameList":"","FileIDs":"[{\"file_id\":6}]"}}' # res = json.loads(s) # KBaseChroma(res) # kbase = res["knowledge_base"] # print("kbase:", kbase) # res = deleteCollectionFileByUserFileID(kbase["UUID"], 6) # print("deleteCollectionFileByUserFileID:", res) # results = KBaseChroma(res) # print("查询结果:", results) # # userID = res["im_context"]["user_id"] # # KBaseTextAdd(kbase, userID) # results = KBaseQuery(kbase, "AI grade your essays", 3) # print("查询结果:", results) # testChroma() # path = r"E:\Code\saw-ai\file\人生 (路遥).txt" # text = "" # try: # with open(path, 'r', encoding='utf-8') as file: # text = file.read() # except FileNotFoundError: # print(f"文件 {path} 未找到。") # except Exception as e: # print(f"读取文件时发生错误: {e}") # metadata = {"source": "人生 (路遥).txt"} # add_text_to_chroma(text, metadata) # 示例:查询 ChromaDB # query_text = "德顺爷爷" # results = query_chroma(query_text) # if results: # # result的key # for key in results.keys(): # print("key:", key) # print("查询结果:", len(results["documents"][0]) if results["documents"] else 0) # # 打印查询结果 # print("查询结果:", results)