diff --git a/handler/file.go b/handler/file.go index 65fb733..431aff8 100644 --- a/handler/file.go +++ b/handler/file.go @@ -23,6 +23,7 @@ func SetUpFileGroup(router *gin.Engine) { fileGroup.POST("/find_file_content", FindFileContent) fileGroup.POST("/create_file_content", CreateFileContent) fileGroup.POST("/get_file_will_convert_content", GetFileWillConvertContent) //需要将文件转为文件内容的文件列表接口 + fileGroup.POST("/convert_msg_to_file", ConvertMsgToFile) //将消息转为文件 } @@ -359,3 +360,25 @@ func GetFileWillConvertContent(c *gin.Context) { } c.JSON(http.StatusOK, resp) } + +func ConvertMsgToFile(c *gin.Context) { + id, _ := c.Get("id") + userId := int(id.(float64)) + var req proto.MessageConvertFileReq + var resp proto.GenerateResp + if err := c.ShouldBind(&req); err == nil { + files, err2 := service.GetFileWillConvertContentFileList(userId) + if err2 != nil { + resp.Code = proto.ParameterError + resp.Message = "find file content failed:" + err2.Error() + } else { + resp.Code = proto.SuccessCode + resp.Message = "success" + resp.Data = files + } + } else { + resp.Code = proto.ParameterError + resp.Message = "message convert file form parameter decode error:" + err.Error() + } + c.JSON(http.StatusOK, resp) +} diff --git a/kbase_py/fileBase.py b/kbase_py/fileBase.py new file mode 100644 index 0000000..d3f3c55 --- /dev/null +++ b/kbase_py/fileBase.py @@ -0,0 +1,141 @@ +import os +import docx +import PyPDF2 +from epub_conversion.utils import open_book, convert_epub_to_lines +import subprocess +import ebooklib +from ebooklib import epub +from bs4 import BeautifulSoup +import requests + +def extract_text_from_txt(file_path): + """从 TXT 文件中提取文本""" + try: + with open(file_path, 'r', encoding='utf-8') as file: + text = file.read() + return text + except Exception as e: + print(f"读取 TXT 文件时出错: {e}") + return None + +def extract_text_from_docx(file_path): + """从 DOCX 文件中提取文本""" + try: + doc = docx.Document(file_path) + full_text = [] + for para in doc.paragraphs: + full_text.append(para.text) + return '\n'.join(full_text) + except Exception as e: + print(f"读取 DOCX 文件时出错: {e}") + return None + +def extract_text_from_pdf(file_path): + """从 PDF 文件中提取文本""" + try: + with open(file_path, 'rb') as file: + pdf_reader = PyPDF2.PdfReader(file) + text = "" + for page in pdf_reader.pages: + text += page.extract_text() + return text + except Exception as e: + print(f"读取 PDF 文件时出错: {e}") + return None + +def extract_text_from_epub(epub_path, txt_path): + # 加载EPUB文件 + book = epub.read_epub(epub_path) + + # 创建一个空字符串来存储所有文本 + full_text = "" + + # 遍历书中的所有项目 + for item in book.get_items(): + if item.get_type() == ebooklib.ITEM_DOCUMENT: + # 解析HTML内容 + soup = BeautifulSoup(item.content, 'html.parser') + # 获取纯文本 + text = soup.get_text() + # 添加到总文本中 + full_text += text + "\n" + + # 将文本写入TXT文件 + with open(txt_path, 'w', encoding='utf-8') as file: + file.write(full_text) + +def extract_text_from_doc(file_path): + """从 DOC 文件中提取文本""" + try: + # 构建新的 DOCX 文件路径 + docx_path = os.path.splitext(file_path)[0] + '.docx' + # 使用 LibreOffice 将 DOC 文件转换为 DOCX 文件 + subprocess.run(['soffice', '--headless', '--convert-to', 'docx', file_path, '--outdir', os.path.dirname(file_path)]) + # 从转换后的 DOCX 文件中提取文本 + text = extract_text_from_docx(docx_path) + # 删除临时生成的 DOCX 文件 + if os.path.exists(docx_path): + os.remove(docx_path) + return text + except Exception as e: + print(f"读取 DOC 文件时出错: {e}") + return None + +def extract_text(file_path): + """根据文件扩展名选择合适的提取函数""" + file_extension = os.path.splitext(file_path)[1].lower() + if file_extension == '.txt': + return extract_text_from_txt(file_path) + elif file_extension == '.docx': + return extract_text_from_docx(file_path) + elif file_extension == '.pdf': + return extract_text_from_pdf(file_path) + elif file_extension == '.epub': + return extract_text_from_epub(file_path) + elif file_extension == '.doc': + return extract_text_from_doc(file_path) + else: + print(f"不支持的文件格式: {file_extension}") + return None + +# # 示例使用 +# file_path = '/home/saw/file/人生 (路遥).epub' # 替换为实际的文件路径 +# text = extract_text(file_path) +# if text: +# print(text[:1000]) # 打印前 1000 个字符 + +def get_file_content_by_path(file_path): + """获取文件内容""" + text = extract_text(file_path) + if text: + return text + else: + print("未能获取文件内容") + return None + + +def dowload_file(file_url, save_path): + """下载文件""" + try: + response = requests.get(file_url) + if response.status_code == 200: + with open(save_path, 'wb') as file: + file.write(response.content) + print(f"文件下载成功: {save_path}") + else: + print(f"下载失败,状态码: {response.status_code}") + except Exception as e: + print(f"下载文件时出错: {e}") + +#filepath为fileurl,从网络下载 +def get_file_content(fileName): + save_path = '/home/saw/file/' + fileName + file_url = 'https://pm.ljsea.top/tool/file/' + fileName + dowload_file(file_url, save_path) + """获取文件内容""" + text = extract_text(save_path) + if text: + return text + else: + print("未能获取文件内容") + return None \ No newline at end of file diff --git a/kbase_py/fileChroma.py b/kbase_py/fileChroma.py new file mode 100644 index 0000000..6882155 --- /dev/null +++ b/kbase_py/fileChroma.py @@ -0,0 +1,293 @@ +import chromadb +from langchain.text_splitter import CharacterTextSplitter +import hashlib +import time +import json +import fileCon +import redis + +#chroma_client = chromadb.Client() +chroma_client = chromadb.PersistentClient(path=r"E:\Code\saw-ai\chroma") # 使用持久化存储 +collection = chroma_client.get_or_create_collection("test_collection") +r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) + +def add_text_to_chroma(text, metadata=None): + """将文本添加到 ChromaDB""" + try: + # 分割文本 + text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20) + documents = text_splitter.split_text(text) + print("documents_:", len(documents[0])) + + # 生成唯一的 ID + ids = [] + timestamp = str(int(time.time() * 1000)) + for i, doc in enumerate(documents): + unique_string = doc + timestamp + str(i) + id_ = hashlib.sha256(unique_string.encode()).hexdigest() + ids.append(id_) + print("ids:", len(ids[0])) + + # 使用 ChromaDB 的 add 方法将文本和元数据添加到集合中 + collection.add( + documents=documents, + metadatas=[metadata] * len(documents) if metadata else [None] * len(documents), + ids=ids + ) + print("文本已成功添加到 ChromaDB") + print("集合中的文档数量:", collection.count()) + except Exception as e: + print(f"添加文本到 ChromaDB 时发生错误: {e}") + + +def query_chroma(query_text, n_results=5): + """查询 ChromaDB""" + try: + results = collection.query( + query_texts=[query_text], + n_results=n_results + ) + return results + except Exception as e: + print(f"查询 ChromaDB 时发生错误: {e}") + return None + + +def testChroma(): + """测试 ChromaDB""" + # 添加文本到 ChromaDB + text = "这是一个测试文本。" + metadata = {"source": "test.txt"} + add_text_to_chroma(text, metadata) + + # 查询 ChromaDB + query_text = "测试" + results = query_chroma(query_text) + print("查询结果:", json.dumps(results, ensure_ascii=False, indent=4)) + if results: + print("查询结果:", results) + +def KBaseTextAdd(kbase,userID): + fileIDs = json.loads(kbase["FileIDs"]) + # print("fileIDs:", fileIDs) + # print("fileIDs type:", type(fileIDs)) + collection_name = kbase["UUID"] + print("collection_name:", collection_name) + kcollection = chroma_client.get_or_create_collection(collection_name) + for userFileID in fileIDs: + #查找用户文件信息 + userFile = fileCon.get_user_file_info(userID, userFileID["file_id"]) + if userFile == None: + print("获取文件信息失败,userfileID:", userFileID["file_id"]) + continue + #print("userFile:", userFile) + # file_id = userFile["FileID"] + file_id = userFileID["file_id"] #实际文件id + data = fileCon.get_file_content(file_id) #获取文件内容 + if data == None: + print("获取文件内容失败,file_id:", file_id) + continue + text = data[0]["FileContent"] + print("text:", text[:100]) + print("text length:", len(text), "file_id:", file_id) + # 分割文本 + text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".") + documents = text_splitter.split_text(text) + print("split documents:", len(documents[0]), " length:", len(documents)) + ids = [] + for i, doc in enumerate(documents): + unique_string = doc + str(i) + id_ = hashlib.sha256(unique_string.encode()).hexdigest() + ids.append(id_) + #获取文件内容 + metadata = [] + for i in range(len(documents)): + metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} ) + print("metadata:", metadata[0], " length:", len(metadata)) + print("documents:", len(documents[0]), " length:", len(documents)) + print("ids:", len(ids[0]), " length:", len(ids)) + # Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name) + kcollection.add( + documents=documents, + metadatas=metadata, + ids=ids + ) + # Chroma.persist() + print("文本已成功添加到 ChromaDB") + + +def KBaseTextAddByUserFileID(collection_name,userID,file_id): + + kcollection = chroma_client.get_or_create_collection(collection_name) + #查找用户文件信息 + userFile = fileCon.get_user_file_info(userID, file_id) + if userFile == None: + print("获取文件信息失败,userfileID:", file_id) + return None + + data = fileCon.get_file_content(file_id) #获取文件内容 + if data == None: + print("获取文件内容失败,file_id:", file_id) + return None + text = data[0]["FileContent"] + print("text:", text[:100]) + print("text length:", len(text), "file_id:", file_id) + # 分割文本 + text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=20,separator=".") + documents = text_splitter.split_text(text) + print("split documents:", len(documents[0]), " length:", len(documents)) + ids = [] + for i, doc in enumerate(documents): + unique_string = doc + str(i) + id_ = hashlib.sha256(unique_string.encode()).hexdigest() + ids.append(id_) + #获取文件内容 + metadata = [] + for i in range(len(documents)): + metadata.append({"source": userFile["UserFileName"], "user_file_id": file_id, "id": ids[i]} ) + print("metadata:", metadata[0], " length:", len(metadata)) + print("documents:", len(documents[0]), " length:", len(documents)) + print("ids:", len(ids[0]), " length:", len(ids)) + # Chroma.from_documents(documents, metadatas=metadata, ids=ids, collection_name=collection_name) + kcollection.add( + documents=documents, + metadatas=metadata, + ids=ids + ) + # Chroma.persist() + #redis中添加文件ID集合 + key = "kbase_py_"+collection_name+"_file_id_set" + r.sadd(key, file_id) + print("文本已成功添加到 ChromaDB,文件ID集合已成功添加到Redis:",key) + return "success" + + +def KBaseQuery(kbase, query_text, n_results=3): + """查询 ChromaDB""" + try: + collection_name = kbase["UUID"] + print("collection_name:", collection_name) + kcollection = chroma_client.get_or_create_collection(collection_name) + results = kcollection.query( + query_texts=[query_text], + n_results=n_results + ) + return results + except Exception as e: + print(f"查询 ChromaDB 时发生错误: {e}") + return None + +def readKBaseRedisFileIDSet(uuid): + """读取Redis中的文件ID集合""" + try: + # 连接到Redis数据库 + r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) + # 获取文件ID集合 + key = "kbase_py_"+uuid+"_file_id_set" + print("read key:", key) + file_id_set = r.smembers(key) + return file_id_set + except Exception as e: + print(f"读取Redis文件ID集合时发生错误: {e}") + return None + +def writeKBaseRedisFileIDSet(uuid, file_id_set): + """将文件ID集合写入Redis""" + try: + # 连接到Redis数据库 + r = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True) + # 将文件ID集合写入Redis + key = "kbase_py_"+uuid+"_file_id_set" + print("write key:", key) + r.sadd("kbase_py_"+uuid, *file_id_set) + print("文件ID集合已成功写入Redis") + except Exception as e: + print(f"写入Redis文件ID集合时发生错误: {e}") + return "error" + return None + +def deleteCollectionFileByUserFileID(collection_name, file_id): + """删除指定文件ID的集合""" + try: + kcollection = chroma_client.get_or_create_collection(collection_name) + kcollection.delete(where={"user_file_id": file_id}) + ## 删除Redis中的文件ID集合 + r.srem(collection_name, file_id) + print("文件ID集合已成功删除") + return "success" + except Exception as e: + print(f"删除文件ID集合时发生错误: {e}") + return None + +def KBaseChroma(data): + kbase = data["knowledge_base"] + userID = data["im_context"]["user_id"] + query_text = data["im_context"]["question"] + fileIDs = json.loads(kbase["FileIDs"]) + #判断文件id与保存的文件id是否一致,如果一致则不需要重新添加,需要增加和删除 + fileIDSet = readKBaseRedisFileIDSet(kbase["UUID"]) + print("fileIDSet:", fileIDSet) + s2 = set() + for fileID in fileIDs: + s2.add(str(fileID["file_id"])) + if str(fileID["file_id"]) in fileIDSet: + print("文件ID已存在:", fileID["file_id"]) + continue + if str(fileID["file_id"]) not in fileIDSet: + #添加文件 + KBaseTextAddByUserFileID(kbase["UUID"], userID, fileID["file_id"]) + break + #删除的文件id:在s2中不存在,在fileIDSet中存在 + sdel = fileIDSet - s2 + print("sdel:", sdel) + for fileID in sdel: + #删除文件 + deleteCollectionFileByUserFileID(kbase["UUID"], fileID) + results = KBaseQuery(kbase, data["im_context"]["question"], 3) + return results + + + +# if __name__ == "__main__": +# s = r'{"im_context":{"user_id":1,"session_id":277,"function_id":6,"model_id":2,"model_type":"ollama","question":"test","channel":"user_1_ai_chat_msg_95aa6472-aebd-424f-bdae-954bb537b813","is_has_image":false},"knowledge_base":{"ID":3,"Name":"test2","Description":"test2","UUID":"0ba47d27-767b-40d4-91e6-cb1adfdcfcff","SessionID":277,"CreatedAt":"","UpdatedAt":"","DeletedAt":"","AuthID":0,"FileNameList":"","FileIDs":"[{\"file_id\":6}]"}}' +# res = json.loads(s) +# KBaseChroma(res) +# kbase = res["knowledge_base"] +# print("kbase:", kbase) +# res = deleteCollectionFileByUserFileID(kbase["UUID"], 6) +# print("deleteCollectionFileByUserFileID:", res) + # results = KBaseChroma(res) + # print("查询结果:", results) + +# # userID = res["im_context"]["user_id"] +# # KBaseTextAdd(kbase, userID) + +# results = KBaseQuery(kbase, "AI grade your essays", 3) +# print("查询结果:", results) + # testChroma() + # path = r"E:\Code\saw-ai\file\人生 (路遥).txt" + # text = "" + # try: + # with open(path, 'r', encoding='utf-8') as file: + # text = file.read() + # except FileNotFoundError: + # print(f"文件 {path} 未找到。") + # except Exception as e: + # print(f"读取文件时发生错误: {e}") + + # metadata = {"source": "人生 (路遥).txt"} + # add_text_to_chroma(text, metadata) + + # 示例:查询 ChromaDB + # query_text = "德顺爷爷" + # results = query_chroma(query_text) + # if results: + # # result的key + # for key in results.keys(): + # print("key:", key) + + # print("查询结果:", len(results["documents"][0]) if results["documents"] else 0) + + # # 打印查询结果 + # print("查询结果:", results) + \ No newline at end of file diff --git a/kbase_py/fileCon.py b/kbase_py/fileCon.py new file mode 100644 index 0000000..03b4dc8 --- /dev/null +++ b/kbase_py/fileCon.py @@ -0,0 +1,124 @@ +import requests +import fileBase + +def get_will_convert_file_list(): + url = "https://pm.ljsea.top/file/get_file_will_convert_content?super_id=1" # 替换为实际的API URL + #post请求 + headers ={ + "token":"5413vfd3sdgtr56hrtagvbbmla", + } + response = requests.post(url, headers=headers) + if response.status_code == 200: + return response.json() + else: + print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}") + return None + +def get_user_file_info(user_id,user_file_id): + url = "https://pm.ljsea.top/file/file_list?super_id=" + str(user_id) # 替换为实际的API URL + #post请求 + headers = { + "token":"5413vfd3sdgtr56hrtagvbbmla", + } + data = { + "type": "one", + "file_id": user_file_id, #用户文件id + } + response = requests.post(url, headers=headers,json=data) + if response.status_code == 200: + res = response.json() + if res["code"] == 0: + return res["data"] + else: + print("获取文件内容失败:",res) + return None + else: + print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}") + return None + + +def create_file_content(fileID,fileContent): + url = "https://pm.ljsea.top/file/create_file_content?super_id=1" # 替换为实际的API URL + #post请求 + headers ={ + "token":"5413vfd3sdgtr56hrtagvbbmla", + } + data = { + "file_id": fileID, #实际文件id + "file_content": fileContent + } + response = requests.post(url, headers=headers,json=data) + if response.status_code == 200: + return response.json() + else: + print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}") + return None + +def get_file_content(id): + url = "https://pm.ljsea.top/file/find_file_content?super_id=1" # 替换为实际的API URL + #post请求 + headers = { + "token":"5413vfd3sdgtr56hrtagvbbmla", + } + data = { + "user_file_id": id #实际文件id + } + response = requests.post(url, headers=headers,json=data) + if response.status_code == 200: + res = response.json() + if res["code"] == 0: + return res["data"] + else: + print("获取文件内容失败:",res) + return None + else: + print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}") + return None + +def get_file(): + file_list_resp = get_will_convert_file_list() + if file_list_resp["code"] == 0: + file_list = file_list_resp["data"] + return file_list + else: + print("获取文件列表失败:",file_list_resp) + return None + + +def get_file_add_convert(): + file_list = get_file() + if file_list: + for file in file_list: + print("文件ID:", file["ID"]) + print("文件名称:", file["FileName"], "\t", file["FileStoreName"]) + text = fileBase.get_file_content(file["FileStoreName"]) + print("文件内容长度:", len(text)) + #将文件内容上传 + print(create_file_content(file["ID"], text)) + break + else: + print("未能获取文件列表") + +def get_Kbase_server_id(): + url = "https://pm.ljsea.top/file/find_file_content?super_id=1" # 替换为实际的API URL + #post请求 + headers = { + "token":"5413vfd3sdgtr56hrtagvbbmla", + } + data = { + "user_file_id": id #实际文件id + } + response = requests.post(url, headers=headers,json=data) + if response.status_code == 200: + res = response.json() + if res["code"] == 0: + return res["data"] + else: + print("获取文件内容失败:",res) + return None + else: + print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}") + return None + +# if __name__ == "__main__": +# get_file_add_convert() \ No newline at end of file diff --git a/kbase_py/im_ws.py b/kbase_py/im_ws.py new file mode 100644 index 0000000..6c857bc --- /dev/null +++ b/kbase_py/im_ws.py @@ -0,0 +1,114 @@ +import websocket +import time +import json +import fileChroma +import threading + +config = {} +stop_event = False + +def on_open(ws): + print("连接已建立") + +def on_message(ws, message): + #print(f"接收到服务器消息: {message}") + print("接收到服务器消息大小:", len(message)) + res = json.loads(message) + #print("req:", res) + results = fileChroma.KBaseChroma(res) + resp ={} + resp["im_context"] = res["im_context"] + resp["knowledge_base"] = res["knowledge_base"] + resp["query_select"] = results + #print("resp:",resp) + respStr = json.dumps(resp, ensure_ascii=False) + print("发送到服务器消息大小:", len(respStr)) + send_message(ws, respStr) # 发送消息 + +#发送消息 +def send_message(ws, message): + try: + ws.send(message) + #print(f"发送消息: {message}") + except Exception as e: + print(f"发送消息失败: {e}") + +def on_close(ws, close_status_code, close_msg): + if stop_event: + print("WebSocket 已关闭") + return + global config + print("连接已关闭,尝试重新连接...") + MAX_RETRIES = config.get("ws_max_retries", 5) # 最大重连次数 + RETRY_DELAY = config.get("ws_retry_delay", 5) # 重连延迟时间(秒) + retries = 0 + while retries < MAX_RETRIES: + try: + time.sleep(RETRY_DELAY) + ws = websocket.WebSocketApp(config["ws_url"]+"&kbase_server_id="+config["kbase_server_id"], + on_open=on_open, + on_message=on_message, + on_close=on_close, + # send_message=send_message, + on_error=on_error) + ws.run_forever() + break + except Exception as e: + retries += 1 + print(f"重连尝试 {retries} 失败: {e}") + if retries == MAX_RETRIES: + print("达到最大重连次数,停止尝试。") + +def on_error(ws, error): + print(f"发生错误: {error}") + +def readConfig(): + """读取配置文件""" + try: + with open(r'E:\Code\saw-ai\python-sag\saw-ai-py.json', 'r', encoding='utf-8') as file: + config = json.load(file) + return config + except FileNotFoundError: + print("配置文件未找到") + return None + except json.JSONDecodeError: + print("配置文件格式错误") + return None + +def writeConfig(config): + """写入配置文件""" + try: + with open(r'E:\Code\saw-ai\python-sag\saw-ai-py.json', 'w', encoding='utf-8') as file: + json.dump(config, file, ensure_ascii=False, indent=4) + except Exception as e: + print(f"写入配置文件失败: {e}") + +#监听用户输入 +def input_listener(ws): + while True: + user_input = input() + if user_input.lower() == "stop": + stop_event = True + ws.close() + break + + +if __name__ == "__main__": + config = readConfig() + if config is None: + print("无法读取配置文件") + exit(1) + print("config:",config) + # 创建 WebSocket 连接 + ws = websocket.WebSocketApp(config["ws_url"]+"&kbase_server_id="+config["kbase_server_id"], + on_open=on_open, + on_message=on_message, + on_close=on_close, + # send_message=send_message, + on_error=on_error) + + input_thread = threading.Thread(target=input_listener, args=(ws,)) + input_thread.start() + # 运行 WebSocket 连接 + ws.run_forever() + \ No newline at end of file diff --git a/kbase_py/saw-ai-py.json b/kbase_py/saw-ai-py.json new file mode 100644 index 0000000..5f40491 --- /dev/null +++ b/kbase_py/saw-ai-py.json @@ -0,0 +1,9 @@ +{ + "kbase_server_id": "win_675b5u4hs", + "ws_url": "wss://pm.ljsea.top/im/kbase_text_ws?token=5413vfd3sdgtr56hrtagvbbmla&super_id=1", + "ws_max_retries": 0, + "ws_retry_delay": 1, + "kbase_info": {"vfdbvd":{"1":true,"collection_name":""}}, + "redis_url": "localhost:6379", + "redis_db": 0 +} \ No newline at end of file diff --git a/proto/tool.go b/proto/tool.go index aeb8627..a8f00d1 100644 --- a/proto/tool.go +++ b/proto/tool.go @@ -48,3 +48,9 @@ type GenerateResp struct { Message string `json:"message"` Data any `json:"data"` } + +type MessageConvertFileReq struct { + MessageID int `json:"message_id" form:"message_id"` // 消息ID + FileType string `json:"file_type" form:"file_type"` // 需要转成的文件类型 + FileName string `json:"file_name" form:"file_name"` // 文件名称 +}