2025-04-10 16:06:08 +08:00
|
|
|
import os
|
|
|
|
|
import docx
|
|
|
|
|
import PyPDF2
|
|
|
|
|
from epub_conversion.utils import open_book, convert_epub_to_lines
|
|
|
|
|
import subprocess
|
|
|
|
|
import ebooklib
|
|
|
|
|
from ebooklib import epub
|
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
def extract_text_from_txt(file_path):
|
|
|
|
|
"""从 TXT 文件中提取文本"""
|
|
|
|
|
try:
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
text = file.read()
|
|
|
|
|
return text
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"读取 TXT 文件时出错: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def extract_text_from_docx(file_path):
|
|
|
|
|
"""从 DOCX 文件中提取文本"""
|
|
|
|
|
try:
|
|
|
|
|
doc = docx.Document(file_path)
|
|
|
|
|
full_text = []
|
|
|
|
|
for para in doc.paragraphs:
|
|
|
|
|
full_text.append(para.text)
|
|
|
|
|
return '\n'.join(full_text)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"读取 DOCX 文件时出错: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path):
|
|
|
|
|
"""从 PDF 文件中提取文本"""
|
|
|
|
|
try:
|
|
|
|
|
with open(file_path, 'rb') as file:
|
|
|
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
|
|
|
text = ""
|
|
|
|
|
for page in pdf_reader.pages:
|
|
|
|
|
text += page.extract_text()
|
|
|
|
|
return text
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"读取 PDF 文件时出错: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
2025-04-10 16:37:38 +08:00
|
|
|
def extract_text_from_epub(epub_path):
|
2025-04-10 16:06:08 +08:00
|
|
|
# 加载EPUB文件
|
|
|
|
|
book = epub.read_epub(epub_path)
|
2025-04-10 16:37:38 +08:00
|
|
|
|
2025-04-10 16:06:08 +08:00
|
|
|
# 创建一个空字符串来存储所有文本
|
|
|
|
|
full_text = ""
|
2025-04-10 16:37:38 +08:00
|
|
|
|
2025-04-10 16:06:08 +08:00
|
|
|
# 遍历书中的所有项目
|
|
|
|
|
for item in book.get_items():
|
|
|
|
|
if item.get_type() == ebooklib.ITEM_DOCUMENT:
|
|
|
|
|
# 解析HTML内容
|
|
|
|
|
soup = BeautifulSoup(item.content, 'html.parser')
|
|
|
|
|
# 获取纯文本
|
|
|
|
|
text = soup.get_text()
|
|
|
|
|
# 添加到总文本中
|
|
|
|
|
full_text += text + "\n"
|
2025-04-10 16:37:38 +08:00
|
|
|
|
2025-04-10 16:06:08 +08:00
|
|
|
# 将文本写入TXT文件
|
2025-04-10 16:37:38 +08:00
|
|
|
# with open(txt_path, 'w', encoding='utf-8') as file:
|
|
|
|
|
# file.write(full_text)
|
|
|
|
|
return full_text
|
2025-04-10 16:06:08 +08:00
|
|
|
|
|
|
|
|
def extract_text_from_doc(file_path):
|
|
|
|
|
"""从 DOC 文件中提取文本"""
|
|
|
|
|
try:
|
|
|
|
|
# 构建新的 DOCX 文件路径
|
|
|
|
|
docx_path = os.path.splitext(file_path)[0] + '.docx'
|
|
|
|
|
# 使用 LibreOffice 将 DOC 文件转换为 DOCX 文件
|
|
|
|
|
subprocess.run(['soffice', '--headless', '--convert-to', 'docx', file_path, '--outdir', os.path.dirname(file_path)])
|
|
|
|
|
# 从转换后的 DOCX 文件中提取文本
|
|
|
|
|
text = extract_text_from_docx(docx_path)
|
|
|
|
|
# 删除临时生成的 DOCX 文件
|
|
|
|
|
if os.path.exists(docx_path):
|
|
|
|
|
os.remove(docx_path)
|
|
|
|
|
return text
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"读取 DOC 文件时出错: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def extract_text(file_path):
|
|
|
|
|
"""根据文件扩展名选择合适的提取函数"""
|
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
if file_extension == '.txt':
|
|
|
|
|
return extract_text_from_txt(file_path)
|
|
|
|
|
elif file_extension == '.docx':
|
|
|
|
|
return extract_text_from_docx(file_path)
|
|
|
|
|
elif file_extension == '.pdf':
|
|
|
|
|
return extract_text_from_pdf(file_path)
|
|
|
|
|
elif file_extension == '.epub':
|
|
|
|
|
return extract_text_from_epub(file_path)
|
|
|
|
|
elif file_extension == '.doc':
|
|
|
|
|
return extract_text_from_doc(file_path)
|
|
|
|
|
else:
|
|
|
|
|
print(f"不支持的文件格式: {file_extension}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# # 示例使用
|
|
|
|
|
# file_path = '/home/saw/file/人生 (路遥).epub' # 替换为实际的文件路径
|
|
|
|
|
# text = extract_text(file_path)
|
|
|
|
|
# if text:
|
|
|
|
|
# print(text[:1000]) # 打印前 1000 个字符
|
|
|
|
|
|
|
|
|
|
def get_file_content_by_path(file_path):
|
|
|
|
|
"""获取文件内容"""
|
|
|
|
|
text = extract_text(file_path)
|
|
|
|
|
if text:
|
|
|
|
|
return text
|
|
|
|
|
else:
|
|
|
|
|
print("未能获取文件内容")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dowload_file(file_url, save_path):
|
|
|
|
|
"""下载文件"""
|
|
|
|
|
try:
|
|
|
|
|
response = requests.get(file_url)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
with open(save_path, 'wb') as file:
|
|
|
|
|
file.write(response.content)
|
|
|
|
|
print(f"文件下载成功: {save_path}")
|
|
|
|
|
else:
|
|
|
|
|
print(f"下载失败,状态码: {response.status_code}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"下载文件时出错: {e}")
|
|
|
|
|
|
|
|
|
|
#filepath为fileurl,从网络下载
|
|
|
|
|
def get_file_content(fileName):
|
|
|
|
|
save_path = '/home/saw/file/' + fileName
|
2025-04-10 16:37:38 +08:00
|
|
|
#查看文件是否存在
|
|
|
|
|
if os.path.exists(save_path):
|
|
|
|
|
print(f"文件已存在: {save_path}")
|
|
|
|
|
return extract_text(save_path)
|
2025-04-10 16:06:08 +08:00
|
|
|
file_url = 'https://pm.ljsea.top/tool/file/' + fileName
|
|
|
|
|
dowload_file(file_url, save_path)
|
|
|
|
|
"""获取文件内容"""
|
|
|
|
|
text = extract_text(save_path)
|
|
|
|
|
if text:
|
|
|
|
|
return text
|
|
|
|
|
else:
|
|
|
|
|
print("未能获取文件内容")
|
|
|
|
|
return None
|