添加消息转文件功能,添加kbase python部分

This commit is contained in:
junleea 2025-04-10 16:06:08 +08:00
parent d17facf9f8
commit 6d4aa34442
7 changed files with 710 additions and 0 deletions

View File

@ -23,6 +23,7 @@ func SetUpFileGroup(router *gin.Engine) {
fileGroup.POST("/find_file_content", FindFileContent)
fileGroup.POST("/create_file_content", CreateFileContent)
fileGroup.POST("/get_file_will_convert_content", GetFileWillConvertContent) //需要将文件转为文件内容的文件列表接口
fileGroup.POST("/convert_msg_to_file", ConvertMsgToFile) //将消息转为文件
}
@ -359,3 +360,25 @@ func GetFileWillConvertContent(c *gin.Context) {
}
c.JSON(http.StatusOK, resp)
}
func ConvertMsgToFile(c *gin.Context) {
id, _ := c.Get("id")
userId := int(id.(float64))
var req proto.MessageConvertFileReq
var resp proto.GenerateResp
if err := c.ShouldBind(&req); err == nil {
files, err2 := service.GetFileWillConvertContentFileList(userId)
if err2 != nil {
resp.Code = proto.ParameterError
resp.Message = "find file content failed:" + err2.Error()
} else {
resp.Code = proto.SuccessCode
resp.Message = "success"
resp.Data = files
}
} else {
resp.Code = proto.ParameterError
resp.Message = "message convert file form parameter decode error:" + err.Error()
}
c.JSON(http.StatusOK, resp)
}

141
kbase_py/fileBase.py Normal file
View File

@ -0,0 +1,141 @@
import os
import docx
import PyPDF2
from epub_conversion.utils import open_book, convert_epub_to_lines
import subprocess
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
import requests
def extract_text_from_txt(file_path):
"""从 TXT 文件中提取文本"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
return text
except Exception as e:
print(f"读取 TXT 文件时出错: {e}")
return None
def extract_text_from_docx(file_path):
"""从 DOCX 文件中提取文本"""
try:
doc = docx.Document(file_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
except Exception as e:
print(f"读取 DOCX 文件时出错: {e}")
return None
def extract_text_from_pdf(file_path):
"""从 PDF 文件中提取文本"""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
except Exception as e:
print(f"读取 PDF 文件时出错: {e}")
return None
def extract_text_from_epub(epub_path, txt_path):
# 加载EPUB文件
book = epub.read_epub(epub_path)
# 创建一个空字符串来存储所有文本
full_text = ""
# 遍历书中的所有项目
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
# 解析HTML内容
soup = BeautifulSoup(item.content, 'html.parser')
# 获取纯文本
text = soup.get_text()
# 添加到总文本中
full_text += text + "\n"
# 将文本写入TXT文件
with open(txt_path, 'w', encoding='utf-8') as file:
file.write(full_text)
def extract_text_from_doc(file_path):
"""从 DOC 文件中提取文本"""
try:
# 构建新的 DOCX 文件路径
docx_path = os.path.splitext(file_path)[0] + '.docx'
# 使用 LibreOffice 将 DOC 文件转换为 DOCX 文件
subprocess.run(['soffice', '--headless', '--convert-to', 'docx', file_path, '--outdir', os.path.dirname(file_path)])
# 从转换后的 DOCX 文件中提取文本
text = extract_text_from_docx(docx_path)
# 删除临时生成的 DOCX 文件
if os.path.exists(docx_path):
os.remove(docx_path)
return text
except Exception as e:
print(f"读取 DOC 文件时出错: {e}")
return None
def extract_text(file_path):
"""根据文件扩展名选择合适的提取函数"""
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.txt':
return extract_text_from_txt(file_path)
elif file_extension == '.docx':
return extract_text_from_docx(file_path)
elif file_extension == '.pdf':
return extract_text_from_pdf(file_path)
elif file_extension == '.epub':
return extract_text_from_epub(file_path)
elif file_extension == '.doc':
return extract_text_from_doc(file_path)
else:
print(f"不支持的文件格式: {file_extension}")
return None
# # 示例使用
# file_path = '/home/saw/file/人生 (路遥).epub' # 替换为实际的文件路径
# text = extract_text(file_path)
# if text:
# print(text[:1000]) # 打印前 1000 个字符
def get_file_content_by_path(file_path):
"""获取文件内容"""
text = extract_text(file_path)
if text:
return text
else:
print("未能获取文件内容")
return None
def dowload_file(file_url, save_path):
"""下载文件"""
try:
response = requests.get(file_url)
if response.status_code == 200:
with open(save_path, 'wb') as file:
file.write(response.content)
print(f"文件下载成功: {save_path}")
else:
print(f"下载失败,状态码: {response.status_code}")
except Exception as e:
print(f"下载文件时出错: {e}")
#filepath为fileurl,从网络下载
def get_file_content(fileName):
save_path = '/home/saw/file/' + fileName
file_url = 'https://pm.ljsea.top/tool/file/' + fileName
dowload_file(file_url, save_path)
"""获取文件内容"""
text = extract_text(save_path)
if text:
return text
else:
print("未能获取文件内容")
return None

293
kbase_py/fileChroma.py Normal file
View File

@ -0,0 +1,293 @@
import chromadb
from langchain.text_splitter import CharacterTextSplitter
import hashlib
import time
import json
import fileCon
import redis
#chroma_client = chromadb.Client()
chroma_client = chromadb.PersistentClient(path=r"E:\Code\saw-ai\chroma") # 使用持久化存储
collection = chroma_client.get_or_create_collection("test_collection")
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
def add_text_to_chroma(text, metadata=None):
"""将文本添加到 ChromaDB"""
try:
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20)
documents = text_splitter.split_text(text)
print("documents_:", len(documents[0]))
# 生成唯一的 ID
ids = []
timestamp = str(int(time.time() * 1000))
for i, doc in enumerate(documents):
unique_string = doc + timestamp + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
print("ids:", len(ids[0]))
# 使用 ChromaDB 的 add 方法将文本和元数据添加到集合中
collection.add(
documents=documents,
metadatas=[metadata] * len(documents) if metadata else [None] * len(documents),
ids=ids
)
print("文本已成功添加到 ChromaDB")
print("集合中的文档数量:", collection.count())
except Exception as e:
print(f"添加文本到 ChromaDB 时发生错误: {e}")
def query_chroma(query_text, n_results=5):
"""查询 ChromaDB"""
try:
results = collection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def testChroma():
"""测试 ChromaDB"""
# 添加文本到 ChromaDB
text = "这是一个测试文本。"
metadata = {"source": "test.txt"}
add_text_to_chroma(text, metadata)
# 查询 ChromaDB
query_text = "测试"
results = query_chroma(query_text)
print("查询结果:", json.dumps(results, ensure_ascii=False, indent=4))
if results:
print("查询结果:", results)
def KBaseTextAdd(kbase,userID):
fileIDs = json.loads(kbase["FileIDs"])
# print("fileIDs:", fileIDs)
# print("fileIDs type:", type(fileIDs))
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
for userFileID in fileIDs:
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, userFileID["file_id"])
if userFile == None:
print("获取文件信息失败,userfileID:", userFileID["file_id"])
continue
#print("userFile:", userFile)
# file_id = userFile["FileID"]
file_id = userFileID["file_id"] #实际文件id
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
continue
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
print("文本已成功添加到 ChromaDB")
def KBaseTextAddByUserFileID(collection_name,userID,file_id):
kcollection = chroma_client.get_or_create_collection(collection_name)
#查找用户文件信息
userFile = fileCon.get_user_file_info(userID, file_id)
if userFile == None:
print("获取文件信息失败,userfileID:", file_id)
return None
data = fileCon.get_file_content(file_id) #获取文件内容
if data == None:
print("获取文件内容失败,file_id:", file_id)
return None
text = data[0]["FileContent"]
print("text:", text[:100])
print("text length:", len(text), "file_id:", file_id)
# 分割文本
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".")
documents = text_splitter.split_text(text)
print("split documents:", len(documents[0]), " length:", len(documents))
ids = []
for i, doc in enumerate(documents):
unique_string = doc + str(i)
id_ = hashlib.sha256(unique_string.encode()).hexdigest()
ids.append(id_)
#获取文件内容
metadata = []
for i in range(len(documents)):
metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} )
print("metadata:", metadata[0], " length:", len(metadata))
print("documents:", len(documents[0]), " length:", len(documents))
print("ids:", len(ids[0]), " length:", len(ids))
# Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name)
kcollection.add(
documents=documents,
metadatas=metadata,
ids=ids
)
# Chroma.persist()
#redis中添加文件ID集合
key = "kbase_py_"+collection_name+"_file_id_set"
r.sadd(key, file_id)
print("文本已成功添加到 ChromaDB,文件ID集合已成功添加到Redis:",key)
return "success"
def KBaseQuery(kbase, query_text, n_results=3):
"""查询 ChromaDB"""
try:
collection_name = kbase["UUID"]
print("collection_name:", collection_name)
kcollection = chroma_client.get_or_create_collection(collection_name)
results = kcollection.query(
query_texts=[query_text],
n_results=n_results
)
return results
except Exception as e:
print(f"查询 ChromaDB 时发生错误: {e}")
return None
def readKBaseRedisFileIDSet(uuid):
"""读取Redis中的文件ID集合"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 获取文件ID集合
key = "kbase_py_"+uuid+"_file_id_set"
print("read key:", key)
file_id_set = r.smembers(key)
return file_id_set
except Exception as e:
print(f"读取Redis文件ID集合时发生错误: {e}")
return None
def writeKBaseRedisFileIDSet(uuid, file_id_set):
"""将文件ID集合写入Redis"""
try:
# 连接到Redis数据库
r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
# 将文件ID集合写入Redis
key = "kbase_py_"+uuid+"_file_id_set"
print("write key:", key)
r.sadd("kbase_py_"+uuid, *file_id_set)
print("文件ID集合已成功写入Redis")
except Exception as e:
print(f"写入Redis文件ID集合时发生错误: {e}")
return "error"
return None
def deleteCollectionFileByUserFileID(collection_name, file_id):
"""删除指定文件ID的集合"""
try:
kcollection = chroma_client.get_or_create_collection(collection_name)
kcollection.delete(where={"user_file_id": file_id})
## 删除Redis中的文件ID集合
r.srem(collection_name, file_id)
print("文件ID集合已成功删除")
return "success"
except Exception as e:
print(f"删除文件ID集合时发生错误: {e}")
return None
def KBaseChroma(data):
kbase = data["knowledge_base"]
userID = data["im_context"]["user_id"]
query_text = data["im_context"]["question"]
fileIDs = json.loads(kbase["FileIDs"])
#判断文件id与保存的文件id是否一致如果一致则不需要重新添加需要增加和删除
fileIDSet = readKBaseRedisFileIDSet(kbase["UUID"])
print("fileIDSet:", fileIDSet)
s2 = set()
for fileID in fileIDs:
s2.add(str(fileID["file_id"]))
if str(fileID["file_id"]) in fileIDSet:
print("文件ID已存在:", fileID["file_id"])
continue
if str(fileID["file_id"]) not in fileIDSet:
#添加文件
KBaseTextAddByUserFileID(kbase["UUID"], userID, fileID["file_id"])
break
#删除的文件id:在s2中不存在在fileIDSet中存在
sdel = fileIDSet - s2
print("sdel:", sdel)
for fileID in sdel:
#删除文件
deleteCollectionFileByUserFileID(kbase["UUID"], fileID)
results = KBaseQuery(kbase, data["im_context"]["question"], 3)
return results
# if __name__ == "__main__":
# s = r'{"im_context":{"user_id":1,"session_id":277,"function_id":6,"model_id":2,"model_type":"ollama","question":"test","channel":"user_1_ai_chat_msg_95aa6472-aebd-424f-bdae-954bb537b813","is_has_image":false},"knowledge_base":{"ID":3,"Name":"test2","Description":"test2","UUID":"0ba47d27-767b-40d4-91e6-cb1adfdcfcff","SessionID":277,"CreatedAt":"","UpdatedAt":"","DeletedAt":"","AuthID":0,"FileNameList":"","FileIDs":"[{\"file_id\":6}]"}}'
# res = json.loads(s)
# KBaseChroma(res)
# kbase = res["knowledge_base"]
# print("kbase:", kbase)
# res = deleteCollectionFileByUserFileID(kbase["UUID"], 6)
# print("deleteCollectionFileByUserFileID:", res)
# results = KBaseChroma(res)
# print("查询结果:", results)
# # userID = res["im_context"]["user_id"]
# # KBaseTextAdd(kbase, userID)
# results = KBaseQuery(kbase, "AI grade your essays", 3)
# print("查询结果:", results)
# testChroma()
# path = r"E:\Code\saw-ai\file\人生 (路遥).txt"
# text = ""
# try:
# with open(path, 'r', encoding='utf-8') as file:
# text = file.read()
# except FileNotFoundError:
# print(f"文件 {path} 未找到。")
# except Exception as e:
# print(f"读取文件时发生错误: {e}")
# metadata = {"source": "人生 (路遥).txt"}
# add_text_to_chroma(text, metadata)
# 示例:查询 ChromaDB
# query_text = "德顺爷爷"
# results = query_chroma(query_text)
# if results:
# # result的key
# for key in results.keys():
# print("key:", key)
# print("查询结果:", len(results["documents"][0]) if results["documents"] else 0)
# # 打印查询结果
# print("查询结果:", results)

124
kbase_py/fileCon.py Normal file
View File

@ -0,0 +1,124 @@
import requests
import fileBase
def get_will_convert_file_list():
url = "https://pm.ljsea.top/file/get_file_will_convert_content?super_id=1" # 替换为实际的API URL
#post请求
headers ={
"token":"5413vfd3sdgtr56hrtagvbbmla",
}
response = requests.post(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
def get_user_file_info(user_id,user_file_id):
url = "https://pm.ljsea.top/file/file_list?super_id=" + str(user_id) # 替换为实际的API URL
#post请求
headers = {
"token":"5413vfd3sdgtr56hrtagvbbmla",
}
data = {
"type": "one",
"file_id": user_file_id, #用户文件id
}
response = requests.post(url, headers=headers,json=data)
if response.status_code == 200:
res = response.json()
if res["code"] == 0:
return res["data"]
else:
print("获取文件内容失败:",res)
return None
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
def create_file_content(fileID,fileContent):
url = "https://pm.ljsea.top/file/create_file_content?super_id=1" # 替换为实际的API URL
#post请求
headers ={
"token":"5413vfd3sdgtr56hrtagvbbmla",
}
data = {
"file_id": fileID, #实际文件id
"file_content": fileContent
}
response = requests.post(url, headers=headers,json=data)
if response.status_code == 200:
return response.json()
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
def get_file_content(id):
url = "https://pm.ljsea.top/file/find_file_content?super_id=1" # 替换为实际的API URL
#post请求
headers = {
"token":"5413vfd3sdgtr56hrtagvbbmla",
}
data = {
"user_file_id": id #实际文件id
}
response = requests.post(url, headers=headers,json=data)
if response.status_code == 200:
res = response.json()
if res["code"] == 0:
return res["data"]
else:
print("获取文件内容失败:",res)
return None
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
def get_file():
file_list_resp = get_will_convert_file_list()
if file_list_resp["code"] == 0:
file_list = file_list_resp["data"]
return file_list
else:
print("获取文件列表失败:",file_list_resp)
return None
def get_file_add_convert():
file_list = get_file()
if file_list:
for file in file_list:
print("文件ID:", file["ID"])
print("文件名称:", file["FileName"], "\t", file["FileStoreName"])
text = fileBase.get_file_content(file["FileStoreName"])
print("文件内容长度:", len(text))
#将文件内容上传
print(create_file_content(file["ID"], text))
break
else:
print("未能获取文件列表")
def get_Kbase_server_id():
url = "https://pm.ljsea.top/file/find_file_content?super_id=1" # 替换为实际的API URL
#post请求
headers = {
"token":"5413vfd3sdgtr56hrtagvbbmla",
}
data = {
"user_file_id": id #实际文件id
}
response = requests.post(url, headers=headers,json=data)
if response.status_code == 200:
res = response.json()
if res["code"] == 0:
return res["data"]
else:
print("获取文件内容失败:",res)
return None
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
# if __name__ == "__main__":
# get_file_add_convert()

114
kbase_py/im_ws.py Normal file
View File

@ -0,0 +1,114 @@
import websocket
import time
import json
import fileChroma
import threading
config = {}
stop_event = False
def on_open(ws):
print("连接已建立")
def on_message(ws, message):
#print(f"接收到服务器消息: {message}")
print("接收到服务器消息大小:", len(message))
res = json.loads(message)
#print("req:", res)
results = fileChroma.KBaseChroma(res)
resp ={}
resp["im_context"] = res["im_context"]
resp["knowledge_base"] = res["knowledge_base"]
resp["query_select"] = results
#print("resp:",resp)
respStr = json.dumps(resp, ensure_ascii=False)
print("发送到服务器消息大小:", len(respStr))
send_message(ws, respStr) # 发送消息
#发送消息
def send_message(ws, message):
try:
ws.send(message)
#print(f"发送消息: {message}")
except Exception as e:
print(f"发送消息失败: {e}")
def on_close(ws, close_status_code, close_msg):
if stop_event:
print("WebSocket 已关闭")
return
global config
print("连接已关闭,尝试重新连接...")
MAX_RETRIES = config.get("ws_max_retries", 5) # 最大重连次数
RETRY_DELAY = config.get("ws_retry_delay", 5) # 重连延迟时间(秒)
retries = 0
while retries < MAX_RETRIES:
try:
time.sleep(RETRY_DELAY)
ws = websocket.WebSocketApp(config["ws_url"]+"&kbase_server_id="+config["kbase_server_id"],
on_open=on_open,
on_message=on_message,
on_close=on_close,
# send_message=send_message,
on_error=on_error)
ws.run_forever()
break
except Exception as e:
retries += 1
print(f"重连尝试 {retries} 失败: {e}")
if retries == MAX_RETRIES:
print("达到最大重连次数,停止尝试。")
def on_error(ws, error):
print(f"发生错误: {error}")
def readConfig():
"""读取配置文件"""
try:
with open(r'E:\Code\saw-ai\python-sag\saw-ai-py.json', 'r', encoding='utf-8') as file:
config = json.load(file)
return config
except FileNotFoundError:
print("配置文件未找到")
return None
except json.JSONDecodeError:
print("配置文件格式错误")
return None
def writeConfig(config):
"""写入配置文件"""
try:
with open(r'E:\Code\saw-ai\python-sag\saw-ai-py.json', 'w', encoding='utf-8') as file:
json.dump(config, file, ensure_ascii=False, indent=4)
except Exception as e:
print(f"写入配置文件失败: {e}")
#监听用户输入
def input_listener(ws):
while True:
user_input = input()
if user_input.lower() == "stop":
stop_event = True
ws.close()
break
if __name__ == "__main__":
config = readConfig()
if config is None:
print("无法读取配置文件")
exit(1)
print("config:",config)
# 创建 WebSocket 连接
ws = websocket.WebSocketApp(config["ws_url"]+"&kbase_server_id="+config["kbase_server_id"],
on_open=on_open,
on_message=on_message,
on_close=on_close,
# send_message=send_message,
on_error=on_error)
input_thread = threading.Thread(target=input_listener, args=(ws,))
input_thread.start()
# 运行 WebSocket 连接
ws.run_forever()

9
kbase_py/saw-ai-py.json Normal file
View File

@ -0,0 +1,9 @@
{
"kbase_server_id": "win_675b5u4hs",
"ws_url": "wss://pm.ljsea.top/im/kbase_text_ws?token=5413vfd3sdgtr56hrtagvbbmla&super_id=1",
"ws_max_retries": 0,
"ws_retry_delay": 1,
"kbase_info": {"vfdbvd":{"1":true,"collection_name":""}},
"redis_url": "localhost:6379",
"redis_db": 0
}

View File

@ -48,3 +48,9 @@ type GenerateResp struct {
Message string `json:"message"`
Data any `json:"data"`
}
type MessageConvertFileReq struct {
MessageID int `json:"message_id" form:"message_id"` // 消息ID
FileType string `json:"file_type" form:"file_type"` // 需要转成的文件类型
FileName string `json:"file_name" form:"file_name"` // 文件名称
}