mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 10:12:51 +00:00
161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
# coding=utf-8
|
||
"""
|
||
@project: maxkb
|
||
@Author:虎
|
||
@file: text_split_handle.py
|
||
@date:2024/3/27 18:19
|
||
@desc:
|
||
"""
|
||
import io
|
||
import os
|
||
import re
|
||
import zipfile
|
||
from typing import List
|
||
from urllib.parse import urljoin
|
||
|
||
import uuid_utils.compat as uuid
|
||
from django.utils.translation import gettext_lazy as _
|
||
|
||
from common.handle.base_parse_qa_handle import BaseParseQAHandle
|
||
from common.handle.impl.qa.csv_parse_qa_handle import CsvParseQAHandle
|
||
from common.handle.impl.qa.xls_parse_qa_handle import XlsParseQAHandle
|
||
from common.handle.impl.qa.xlsx_parse_qa_handle import XlsxParseQAHandle
|
||
from common.utils.common import parse_md_image
|
||
from knowledge.models import File
|
||
|
||
|
||
class FileBufferHandle:
|
||
buffer = None
|
||
|
||
def get_buffer(self, file):
|
||
if self.buffer is None:
|
||
self.buffer = file.read()
|
||
return self.buffer
|
||
|
||
|
||
split_handles = [
|
||
XlsParseQAHandle(),
|
||
XlsxParseQAHandle(),
|
||
CsvParseQAHandle()
|
||
]
|
||
|
||
|
||
def file_to_paragraph(file, save_inner_image):
|
||
"""
|
||
文件转换为段落列表
|
||
@param file: 文件
|
||
@return: {
|
||
name:文件名
|
||
paragraphs:段落列表
|
||
}
|
||
"""
|
||
get_buffer = FileBufferHandle().get_buffer
|
||
for split_handle in split_handles:
|
||
if split_handle.support(file, get_buffer):
|
||
return split_handle.handle(file, get_buffer, save_inner_image)
|
||
raise Exception(_("Unsupported file format"))
|
||
|
||
|
||
def is_valid_uuid(uuid_str: str):
|
||
"""
|
||
校验字符串是否是uuid
|
||
@param uuid_str: 需要校验的字符串
|
||
@return: bool
|
||
"""
|
||
try:
|
||
uuid.UUID(uuid_str)
|
||
except ValueError:
|
||
return False
|
||
return True
|
||
|
||
|
||
def get_image_list(result_list: list, zip_files: List[str]):
|
||
"""
|
||
获取图片文件列表
|
||
@param result_list:
|
||
@param zip_files:
|
||
@return:
|
||
"""
|
||
image_file_list = []
|
||
for result in result_list:
|
||
for p in result.get('paragraphs', []):
|
||
content: str = p.get('content', '')
|
||
image_list = parse_md_image(content)
|
||
for image in image_list:
|
||
search = re.search("\(.*\)", image)
|
||
if search:
|
||
new_image_id = str(uuid.uuid7())
|
||
source_image_path = search.group().replace('(', '').replace(')', '')
|
||
image_path = urljoin(result.get('name'), '.' + source_image_path if source_image_path.startswith(
|
||
'/') else source_image_path)
|
||
if not zip_files.__contains__(image_path):
|
||
continue
|
||
if image_path.startswith('oss/file/') or image_path.startswith('oss/image/'):
|
||
image_id = image_path.replace('oss/file/', '')
|
||
if is_valid_uuid(image_id):
|
||
image_file_list.append({'source_file': image_path,
|
||
'image_id': image_id})
|
||
else:
|
||
image_file_list.append({'source_file': image_path,
|
||
'image_id': new_image_id})
|
||
content = content.replace(source_image_path, f'./oss/file/{new_image_id}')
|
||
p['content'] = content
|
||
else:
|
||
image_file_list.append({'source_file': image_path,
|
||
'image_id': new_image_id})
|
||
content = content.replace(source_image_path, f'./oss/file/{new_image_id}')
|
||
p['content'] = content
|
||
|
||
return image_file_list
|
||
|
||
|
||
def filter_image_file(result_list: list, image_list):
|
||
image_source_file_list = [image.get('source_file') for image in image_list]
|
||
return [r for r in result_list if not image_source_file_list.__contains__(r.get('name', ''))]
|
||
|
||
|
||
class ZipParseQAHandle(BaseParseQAHandle):
|
||
|
||
def handle(self, file, get_buffer, save_image):
|
||
buffer = get_buffer(file)
|
||
bytes_io = io.BytesIO(buffer)
|
||
result = []
|
||
# 打开zip文件
|
||
with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
|
||
# 获取压缩包中的文件名列表
|
||
files = zip_ref.namelist()
|
||
# 读取压缩包中的文件内容
|
||
for file in files:
|
||
# 跳过 macOS 特有的元数据目录和文件
|
||
if file.endswith('/') or file.startswith('__MACOSX'):
|
||
continue
|
||
with zip_ref.open(file) as f:
|
||
# 对文件内容进行处理
|
||
try:
|
||
value = file_to_paragraph(f, save_image)
|
||
if isinstance(value, list):
|
||
result = [*result, *value]
|
||
else:
|
||
result.append(value)
|
||
except Exception:
|
||
pass
|
||
image_list = get_image_list(result, files)
|
||
result = filter_image_file(result, image_list)
|
||
image_mode_list = []
|
||
for image in image_list:
|
||
with zip_ref.open(image.get('source_file')) as f:
|
||
i = File(
|
||
id=image.get('image_id'),
|
||
file_name=os.path.basename(image.get('source_file')),
|
||
meta={'debug': False, 'content': f.read()}
|
||
)
|
||
image_mode_list.append(i)
|
||
save_image(image_mode_list)
|
||
return result
|
||
|
||
def support(self, file, get_buffer):
|
||
file_name: str = file.name.lower()
|
||
if file_name.endswith(".zip") or file_name.endswith(".ZIP"):
|
||
return True
|
||
return False
|