saw-go/kbase_py/fileChroma.py

293 lines
11 KiB
Python
Raw Normal View History

import chromadb
from langchain.text_splitter import CharacterTextSplitter
import hashlib
import time
import json
import fileCon
import redis
#chroma_client = chromadb.Client()
chroma_client = chromadb.PersistentClient(path=r"E:\Code\saw-ai\chroma") # 使用持久化存储
collection = chroma_client.get_or_create_collection("test_collection")
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
def add_text_to_chroma(text, metadata=None):
"""将文本添加到 ChromaDB"""
try:
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20)
documents = text_splitter.split_text(text)
print("documents_:", len(documents[0]))
# 生成唯一的 ID
ids = []
timestamp = str(int(time.time() * 1000))
for i, doc in enumerate(documents):
unique_string = doc + timestamp + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
print("ids:", len(ids[0]))
# 使用 ChromaDB 的 add 方法将文本和元数据添加到集合中
collection.add(
documents=documents,
metadatas=[metadata] * len(documents) if metadata else [None] * len(documents),
ids=ids
)
print("文本已成功添加到 ChromaDB")
print("集合中的文档数量:", collection.count())
except Exception as e:
print(f"添加文本到 ChromaDB 时发生错误: {e}")
def query_chroma(query_text, n_results=5):
"""查询 ChromaDB"""
try:
results = collection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def testChroma():
"""测试 ChromaDB"""
# 添加文本到 ChromaDB
text = "这是一个测试文本。"
metadata = {"source": "test.txt"}
add_text_to_chroma(text, metadata)
# 查询 ChromaDB
query_text = "测试"
results = query_chroma(query_text)
print("查询结果:", json.dumps(results, ensure_ascii=False, indent=4))
if results:
print("查询结果:", results)
def KBaseTextAdd(kbase,userID):
fileIDs = json.loads(kbase["FileIDs"])
# print("fileIDs:", fileIDs)
# print("fileIDs type:", type(fileIDs))
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
for userFileID in fileIDs:
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, userFileID["file_id"])
if userFile == None:
print("获取文件信息失败,userfileID:", userFileID["file_id"])
continue
#print("userFile:", userFile)
# file_id = userFile["FileID"]
file_id = userFileID["file_id"] #实际文件id
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
continue
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
print("文本已成功添加到 ChromaDB")
def KBaseTextAddByUserFileID(collection_name,userID,file_id):
kcollection = chroma_client.get_or_create_collection(collection_name)
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, file_id)
if userFile == None:
print("获取文件信息失败,userfileID:", file_id)
return None
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
return None
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
#redis中添加文件ID集合
key = "kbase_py_"+collection_name+"_file_id_set"
r.sadd(key, file_id)
print("文本已成功添加到 ChromaDB,文件ID集合已成功添加到Redis:",key)
return "success"
def KBaseQuery(kbase, query_text, n_results=3):
"""查询 ChromaDB"""
try:
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
results = kcollection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def readKBaseRedisFileIDSet(uuid):
"""读取Redis中的文件ID集合"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 获取文件ID集合
key = "kbase_py_"+uuid+"_file_id_set"
print("read key:", key)
file_id_set = r.smembers(key)
return file_id_set
except Exception as e:
print(f"读取Redis文件ID集合时发生错误: {e}")
return None
def writeKBaseRedisFileIDSet(uuid, file_id_set):
"""将文件ID集合写入Redis"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 将文件ID集合写入Redis
key = "kbase_py_"+uuid+"_file_id_set"
print("write key:", key)
r.sadd("kbase_py_"+uuid, *file_id_set)
print("文件ID集合已成功写入Redis")
except Exception as e:
print(f"写入Redis文件ID集合时发生错误: {e}")
return "error"
return None
def deleteCollectionFileByUserFileID(collection_name, file_id):
"""删除指定文件ID的集合"""
try:
kcollection = chroma_client.get_or_create_collection(collection_name)
kcollection.delete(where={"user_file_id": file_id})
## 删除Redis中的文件ID集合
r.srem(collection_name, file_id)
print("文件ID集合已成功删除")
return "success"
except Exception as e:
print(f"删除文件ID集合时发生错误: {e}")
return None
def KBaseChroma(data):
kbase = data["knowledge_base"]
userID = data["im_context"]["user_id"]
query_text = data["im_context"]["question"]
fileIDs = json.loads(kbase["FileIDs"])
#判断文件id与保存的文件id是否一致如果一致则不需要重新添加需要增加和删除
fileIDSet = readKBaseRedisFileIDSet(kbase["UUID"])
print("fileIDSet:", fileIDSet)
s2 = set()
for fileID in fileIDs:
s2.add(str(fileID["file_id"]))
if str(fileID["file_id"]) in fileIDSet:
print("文件ID已存在:", fileID["file_id"])
continue
if str(fileID["file_id"]) not in fileIDSet:
#添加文件
KBaseTextAddByUserFileID(kbase["UUID"], userID, fileID["file_id"])
break
#删除的文件id:在s2中不存在在fileIDSet中存在
sdel = fileIDSet - s2
print("sdel:", sdel)
for fileID in sdel:
#删除文件
deleteCollectionFileByUserFileID(kbase["UUID"], fileID)
results = KBaseQuery(kbase, data["im_context"]["question"], 3)
return results
# if __name__ == "__main__":
# s = r'{"im_context":{"user_id":1,"session_id":277,"function_id":6,"model_id":2,"model_type":"ollama","question":"test","channel":"user_1_ai_chat_msg_95aa6472-aebd-424f-bdae-954bb537b813","is_has_image":false},"knowledge_base":{"ID":3,"Name":"test2","Description":"test2","UUID":"0ba47d27-767b-40d4-91e6-cb1adfdcfcff","SessionID":277,"CreatedAt":"","UpdatedAt":"","DeletedAt":"","AuthID":0,"FileNameList":"","FileIDs":"[{\"file_id\":6}]"}}'
# res = json.loads(s)
# KBaseChroma(res)
# kbase = res["knowledge_base"]
# print("kbase:", kbase)
# res = deleteCollectionFileByUserFileID(kbase["UUID"], 6)
# print("deleteCollectionFileByUserFileID:", res)
# results = KBaseChroma(res)
# print("查询结果:", results)
# # userID = res["im_context"]["user_id"]
# # KBaseTextAdd(kbase, userID)
# results = KBaseQuery(kbase, "AI grade your essays", 3)
# print("查询结果:", results)
# testChroma()
# path = r"E:\Code\saw-ai\file\人生 (路遥).txt"
# text = ""
# try:
# with open(path, 'r', encoding='utf-8') as file:
# text = file.read()
# except FileNotFoundError:
# print(f"文件 {path} 未找到。")
# except Exception as e:
# print(f"读取文件时发生错误: {e}")
# metadata = {"source": "人生 (路遥).txt"}
# add_text_to_chroma(text, metadata)
# 示例:查询 ChromaDB
# query_text = "德顺爷爷"
# results = query_chroma(query_text)
# if results:
# # result的key
# for key in results.keys():
# print("key:", key)
# print("查询结果:", len(results["documents"][0]) if results["documents"] else 0)
# # 打印查询结果
# print("查询结果:", results)