fix: 优化文档分割处理 (#814)

This commit is contained in:
gcalgoz 2024-07-19 17:33:18 +08:00 committed by GitHub
parent d3d09b10ec
commit 4d8ac28674
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -19,6 +19,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor):
:param text: 文本
:param level_content_list: 拆分的title数组
:param level_content_index: 指定的下标
:param cursor: 开始的下标位置
:return: 拆分后的文本数据
"""
start_content: str = level_content_list[level_content_index].get('content')
@ -26,7 +27,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor):
level_content_list) else None
start_index = text.index(start_content, cursor)
end_index = text.index(next_content, start_index + 1) if next_content is not None else len(text)
return text[start_index:end_index].lstrip(level_content_list[level_content_index]['content']), end_index
return text[start_index+len(start_content):end_index], end_index
def to_tree_obj(content, state='title'):
@ -148,10 +149,10 @@ def to_block_paragraph(tree_data_list: List[dict]):
def parse_title_level(text, content_level_pattern: List, index):
if len(content_level_pattern) == index:
if index >= len(content_level_pattern):
return []
result = parse_level(text, content_level_pattern[index])
if len(result) == 0 and len(content_level_pattern) > index + 1:
if len(result) == 0 and len(content_level_pattern) > index:
return parse_title_level(text, content_level_pattern, index + 1)
return result
@ -213,46 +214,48 @@ def group_by(list_source: List, key):
return result
def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain):
def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain, with_filter: bool):
"""
转换为分段对象
:param result_tree: 解析文本的树
:param result: [] 用于递归
:param parent_chain: [] 用户递归存储数据
:param with_filter: 是否过滤block
:return: List[{'problem':'xx','content':'xx'}]
"""
for item in result_tree:
if item.get('state') == 'block':
result.append({'title': " ".join(parent_chain), 'content': item.get("content")})
result.append({'title': " ".join(parent_chain),
'content': filter_special_char(item.get("content")) if with_filter else item.get("content")})
children = item.get("children")
if children is not None and len(children) > 0:
result_tree_to_paragraph(children, result, [*parent_chain, item.get('content')])
result_tree_to_paragraph(children, result,
[*parent_chain, remove_special_symbol(item.get('content'))], with_filter)
return result
def post_handler_paragraph(content: str, limit: int, with_filter: bool):
def post_handler_paragraph(content: str, limit: int):
"""
根据文本的最大字符分段
:param content: 需要分段的文本字段
:param limit: 最大分段字符
:return: 分段后数据
"""
根据文本的最大字符分段
:param with_filter: 是否过滤特殊字符
:param content: 需要分段的文本字段
:param limit: 最大分段字符
:return: 分段后数据
"""
split_list = content.split('\n')
result = []
temp_char = ''
for split in split_list:
temp_char, start = '', 0
while (pos := content.find("\n", start)) != -1:
split, start = content[start:pos + 1], pos + 1
if len(temp_char + split) > limit:
result.append(temp_char)
temp_char = ''
temp_char = temp_char + split + '\n'
temp_char = temp_char + split
temp_char = temp_char + content[start:]
if len(temp_char) > 0:
result.append(temp_char)
pattern = "[\\S\\s]{1," + str(limit) + '}'
# 如果\n 单段超过限制,则继续拆分
s = list(map(lambda row: filter_special_char(row) if with_filter else row, list(
reduce(lambda x, y: [*x, *y], list(map(lambda row: list(re.findall(pattern, row)), result)), []))))
return s
return reduce(lambda x, y: [*x, *y], map(lambda row: re.findall(pattern, row), result), [])
replace_map = {
@ -293,40 +296,24 @@ class SplitModel:
:param index: 从那个正则开始解析
:return: 解析后的树形结果数据
"""
if len(self.content_level_pattern) == index:
return
level_content_list = parse_title_level(text, self.content_level_pattern, 0)
level_content_list = parse_title_level(text, self.content_level_pattern, index)
if len(level_content_list) == 0:
return list(map(lambda row: to_tree_obj(row, 'block'), post_handler_paragraph(text, limit=self.limit)))
if index == 0 and text.lstrip().index(level_content_list[0]["content"].lstrip()) != 0:
level_content_list.insert(0, to_tree_obj(""))
cursor = 0
for i in range(len(level_content_list)):
block, cursor = get_level_block(text, level_content_list, i, cursor)
children = self.parse_to_tree(text=block,
index=index + 1)
if children is not None and len(children) > 0:
level_content_list[i]['children'] = children
else:
if len(block) > 0:
level_content_list[i]['children'] = list(
map(lambda row: to_tree_obj(row, 'block'),
post_handler_paragraph(block, with_filter=self.with_filter, limit=self.limit)))
if len(level_content_list) > 0:
end_index = text.index(level_content_list[0].get('content'))
if end_index == 0:
return level_content_list
other_content = text[0:end_index]
children = self.parse_to_tree(text=other_content,
index=index)
if len(children) > 0:
level_content_list = [*level_content_list, *children]
else:
if len(other_content.strip()) > 0:
level_content_list = [*level_content_list, *list(
map(lambda row: to_tree_obj(row, 'block'),
post_handler_paragraph(other_content, with_filter=self.with_filter, limit=self.limit)))]
else:
if len(text.strip()) > 0:
level_content_list = [*level_content_list, *list(
map(lambda row: to_tree_obj(row, 'block'),
post_handler_paragraph(text, with_filter=self.with_filter, limit=self.limit)))]
if len(block) == 0:
level_content_list[i]['children'] = [to_tree_obj("", "block")]
continue
children = self.parse_to_tree(text=block, index=index + 1)
level_content_list[i]['children'] = children
first_child_idx_in_block = block.lstrip().index(children[0]["content"].lstrip())
if first_child_idx_in_block != 0:
inner_children = self.parse_to_tree(block[:first_child_idx_in_block], index + 1)
level_content_list[i]['children'].extend(inner_children)
return level_content_list
def parse(self, text: str):
@ -339,7 +326,7 @@ class SplitModel:
text = text.replace('\r', '\n')
text = text.replace("\0", '')
result_tree = self.parse_to_tree(text, 0)
result = result_tree_to_paragraph(result_tree, [], [])
result = result_tree_to_paragraph(result_tree, [], [], self.with_filter)
return [item for item in [self.post_reset_paragraph(row) for row in result] if
'content' in item and len(item.get('content').strip()) > 0]