MaxKB/apps/common/handle/impl/text/text_split_handle.py
CaptainB 4c9756839a
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
chore: normalize with_filter parameter to boolean in split handle files
--bug=1057879 --user=刘瑞斌 【知识库】高级分段中自动清洗功能未生效 https://www.tapd.cn/62980211/s/1727744
2025-07-10 15:06:19 +08:00

66 lines
2.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file text_split_handle.py
@date2024/3/27 18:19
@desc:
"""
import re
import traceback
from typing import List
from charset_normalizer import detect
from common.handle.base_split_handle import BaseSplitHandle
from common.utils.logger import maxkb_logger
from common.utils.split_model import SplitModel
default_pattern_list = [
re.compile('(?<=^)# (?!-\\*- coding:).*|(?<=\\n)# (?!-\\*- coding:).*'),
re.compile('(?<=\\n)(?<!#)## (?!#).*|(?<=^)(?<!#)## (?!#).*'),
re.compile("(?<=\\n)(?<!#)### (?!#).*|(?<=^)(?<!#)### (?!#).*"),
re.compile("(?<=\\n)(?<!#)#### (?!#).*|(?<=^)(?<!#)#### (?!#).*"),
re.compile("(?<=\\n)(?<!#)##### (?!#).*|(?<=^)(?<!#)##### (?!#).*"),
re.compile("(?<=\\n)(?<!#)###### (?!#).*|(?<=^)(?<!#)###### (?!#).*")
]
class TextSplitHandle(BaseSplitHandle):
def support(self, file, get_buffer):
buffer = get_buffer(file)
file_name: str = file.name.lower()
if file_name.endswith(".md") or file_name.endswith('.txt') or file_name.endswith('.TXT') or file_name.endswith(
'.MD'):
return True
result = detect(buffer)
if result['encoding'] is not None and result['confidence'] is not None and result['encoding'] != 'ascii' and \
result['confidence'] > 0.5:
return True
return False
def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
buffer = get_buffer(file)
if type(limit) is str:
limit = int(limit)
if type(with_filter) is str:
with_filter = with_filter.lower() == 'true'
if pattern_list is not None and len(pattern_list) > 0:
split_model = SplitModel(pattern_list, with_filter, limit)
else:
split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit)
try:
content = buffer.decode(detect(buffer)['encoding'])
except BaseException as e:
maxkb_logger.error(f"Error processing TEXT file {file.name}: {e}, {traceback.format_exc()}")
return {'name': file.name, 'content': []}
return {'name': file.name, 'content': split_model.parse(content)}
def get_content(self, file, save_image):
buffer = file.read()
try:
return buffer.decode(detect(buffer)['encoding'])
except BaseException as e:
traceback.print_exception(e)
return f'{e}'