saw-go/kbase_py/fileChroma.py

293 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import chromadb
from langchain.text_splitter import CharacterTextSplitter
import hashlib
import time
import json
import fileCon
import redis
#chroma_client = chromadb.Client()
chroma_client = chromadb.PersistentClient(path=r"E:\Code\saw-ai\chroma") # 使用持久化存储
collection = chroma_client.get_or_create_collection("test_collection")
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
def add_text_to_chroma(text, metadata=None):
"""将文本添加到 ChromaDB"""
try:
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20)
documents = text_splitter.split_text(text)
print("documents_:", len(documents[0]))
# 生成唯一的 ID
ids = []
timestamp = str(int(time.time() * 1000))
for i, doc in enumerate(documents):
unique_string = doc + timestamp + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
print("ids:", len(ids[0]))
# 使用 ChromaDB 的 add 方法将文本和元数据添加到集合中
collection.add(
documents=documents,
metadatas=[metadata] * len(documents) if metadata else [None] * len(documents),
ids=ids
)
print("文本已成功添加到 ChromaDB")
print("集合中的文档数量:", collection.count())
except Exception as e:
print(f"添加文本到 ChromaDB 时发生错误: {e}")
def query_chroma(query_text, n_results=5):
"""查询 ChromaDB"""
try:
results = collection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def testChroma():
"""测试 ChromaDB"""
# 添加文本到 ChromaDB
text = "这是一个测试文本。"
metadata = {"source": "test.txt"}
add_text_to_chroma(text, metadata)
# 查询 ChromaDB
query_text = "测试"
results = query_chroma(query_text)
print("查询结果:", json.dumps(results, ensure_ascii=False, indent=4))
if results:
print("查询结果:", results)
def KBaseTextAdd(kbase,userID):
fileIDs = json.loads(kbase["FileIDs"])
# print("fileIDs:", fileIDs)
# print("fileIDs type:", type(fileIDs))
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
for userFileID in fileIDs:
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, userFileID["file_id"])
if userFile == None:
print("获取文件信息失败,userfileID:", userFileID["file_id"])
continue
#print("userFile:", userFile)
# file_id = userFile["FileID"]
file_id = userFileID["file_id"] #实际文件id
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
continue
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
print("文本已成功添加到 ChromaDB")
def KBaseTextAddByUserFileID(collection_name,userID,file_id):
kcollection = chroma_client.get_or_create_collection(collection_name)
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, file_id)
if userFile == None:
print("获取文件信息失败,userfileID:", file_id)
return None
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
return None
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
#redis中添加文件ID集合
key = "kbase_py_"+collection_name+"_file_id_set"
r.sadd(key, file_id)
print("文本已成功添加到 ChromaDB,文件ID集合已成功添加到Redis:",key)
return "success"
def KBaseQuery(kbase, query_text, n_results=3):
"""查询 ChromaDB"""
try:
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
results = kcollection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def readKBaseRedisFileIDSet(uuid):
"""读取Redis中的文件ID集合"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 获取文件ID集合
key = "kbase_py_"+uuid+"_file_id_set"
print("read key:", key)
file_id_set = r.smembers(key)
return file_id_set
except Exception as e:
print(f"读取Redis文件ID集合时发生错误: {e}")
return None
def writeKBaseRedisFileIDSet(uuid, file_id_set):
"""将文件ID集合写入Redis"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 将文件ID集合写入Redis
key = "kbase_py_"+uuid+"_file_id_set"
print("write key:", key)
r.sadd("kbase_py_"+uuid, *file_id_set)
print("文件ID集合已成功写入Redis")
except Exception as e:
print(f"写入Redis文件ID集合时发生错误: {e}")
return "error"
return None
def deleteCollectionFileByUserFileID(collection_name, file_id):
"""删除指定文件ID的集合"""
try:
kcollection = chroma_client.get_or_create_collection(collection_name)
kcollection.delete(where={"user_file_id": file_id})
## 删除Redis中的文件ID集合
r.srem(collection_name, file_id)
print("文件ID集合已成功删除")
return "success"
except Exception as e:
print(f"删除文件ID集合时发生错误: {e}")
return None
def KBaseChroma(data):
kbase = data["knowledge_base"]
userID = data["im_context"]["user_id"]
query_text = data["im_context"]["question"]
fileIDs = json.loads(kbase["FileIDs"])
#判断文件id与保存的文件id是否一致如果一致则不需要重新添加需要增加和删除
fileIDSet = readKBaseRedisFileIDSet(kbase["UUID"])
print("fileIDSet:", fileIDSet)
s2 = set()
for fileID in fileIDs:
s2.add(str(fileID["file_id"]))
if str(fileID["file_id"]) in fileIDSet:
print("文件ID已存在:", fileID["file_id"])
continue
if str(fileID["file_id"]) not in fileIDSet:
#添加文件
KBaseTextAddByUserFileID(kbase["UUID"], userID, fileID["file_id"])
break
#删除的文件id:在s2中不存在在fileIDSet中存在
sdel = fileIDSet - s2
print("sdel:", sdel)
for fileID in sdel:
#删除文件
deleteCollectionFileByUserFileID(kbase["UUID"], fileID)
results = KBaseQuery(kbase, data["im_context"]["question"], 3)
return results
# if __name__ == "__main__":
# s = r'{"im_context":{"user_id":1,"session_id":277,"function_id":6,"model_id":2,"model_type":"ollama","question":"test","channel":"user_1_ai_chat_msg_95aa6472-aebd-424f-bdae-954bb537b813","is_has_image":false},"knowledge_base":{"ID":3,"Name":"test2","Description":"test2","UUID":"0ba47d27-767b-40d4-91e6-cb1adfdcfcff","SessionID":277,"CreatedAt":"","UpdatedAt":"","DeletedAt":"","AuthID":0,"FileNameList":"","FileIDs":"[{\"file_id\":6}]"}}'
# res = json.loads(s)
# KBaseChroma(res)
# kbase = res["knowledge_base"]
# print("kbase:", kbase)
# res = deleteCollectionFileByUserFileID(kbase["UUID"], 6)
# print("deleteCollectionFileByUserFileID:", res)
# results = KBaseChroma(res)
# print("查询结果:", results)
# # userID = res["im_context"]["user_id"]
# # KBaseTextAdd(kbase, userID)
# results = KBaseQuery(kbase, "AI grade your essays", 3)
# print("查询结果:", results)
# testChroma()
# path = r"E:\Code\saw-ai\file\人生 (路遥).txt"
# text = ""
# try:
# with open(path, 'r', encoding='utf-8') as file:
# text = file.read()
# except FileNotFoundError:
# print(f"文件 {path} 未找到。")
# except Exception as e:
# print(f"读取文件时发生错误: {e}")
# metadata = {"source": "人生 (路遥).txt"}
# add_text_to_chroma(text, metadata)
# 示例:查询 ChromaDB
# query_text = "德顺爷爷"
# results = query_chroma(query_text)
# if results:
# # result的key
# for key in results.keys():
# print("key:", key)
# print("查询结果:", len(results["documents"][0]) if results["documents"] else 0)
# # 打印查询结果
# print("查询结果:", results)