diff --git a/apps/common/util/split_model.py b/apps/common/util/split_model.py index da7a8976a..ce8a6946e 100644 --- a/apps/common/util/split_model.py +++ b/apps/common/util/split_model.py @@ -19,6 +19,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor): :param text: 文本 :param level_content_list: 拆分的title数组 :param level_content_index: 指定的下标 + :param cursor: 开始的下标位置 :return: 拆分后的文本数据 """ start_content: str = level_content_list[level_content_index].get('content') @@ -26,7 +27,7 @@ def get_level_block(text, level_content_list, level_content_index, cursor): level_content_list) else None start_index = text.index(start_content, cursor) end_index = text.index(next_content, start_index + 1) if next_content is not None else len(text) - return text[start_index:end_index].lstrip(level_content_list[level_content_index]['content']), end_index + return text[start_index+len(start_content):end_index], end_index def to_tree_obj(content, state='title'): @@ -148,10 +149,10 @@ def to_block_paragraph(tree_data_list: List[dict]): def parse_title_level(text, content_level_pattern: List, index): - if len(content_level_pattern) == index: + if index >= len(content_level_pattern): return [] result = parse_level(text, content_level_pattern[index]) - if len(result) == 0 and len(content_level_pattern) > index + 1: + if len(result) == 0 and len(content_level_pattern) > index: return parse_title_level(text, content_level_pattern, index + 1) return result @@ -213,46 +214,48 @@ def group_by(list_source: List, key): return result -def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain): +def result_tree_to_paragraph(result_tree: List[dict], result, parent_chain, with_filter: bool): """ 转换为分段对象 :param result_tree: 解析文本的树 :param result: 传[] 用于递归 :param parent_chain: 传[] 用户递归存储数据 + :param with_filter: 是否过滤block :return: List[{'problem':'xx','content':'xx'}] """ for item in result_tree: if item.get('state') == 'block': - result.append({'title': " ".join(parent_chain), 'content': item.get("content")}) + result.append({'title': " ".join(parent_chain), + 'content': filter_special_char(item.get("content")) if with_filter else item.get("content")}) children = item.get("children") if children is not None and len(children) > 0: - result_tree_to_paragraph(children, result, [*parent_chain, item.get('content')]) + result_tree_to_paragraph(children, result, + [*parent_chain, remove_special_symbol(item.get('content'))], with_filter) return result -def post_handler_paragraph(content: str, limit: int, with_filter: bool): +def post_handler_paragraph(content: str, limit: int): + """ + 根据文本的最大字符分段 + :param content: 需要分段的文本字段 + :param limit: 最大分段字符 + :return: 分段后数据 """ - 根据文本的最大字符分段 - :param with_filter: 是否过滤特殊字符 - :param content: 需要分段的文本字段 - :param limit: 最大分段字符 - :return: 分段后数据 - """ - split_list = content.split('\n') result = [] - temp_char = '' - for split in split_list: + temp_char, start = '', 0 + while (pos := content.find("\n", start)) != -1: + split, start = content[start:pos + 1], pos + 1 if len(temp_char + split) > limit: result.append(temp_char) temp_char = '' - temp_char = temp_char + split + '\n' + temp_char = temp_char + split + temp_char = temp_char + content[start:] if len(temp_char) > 0: result.append(temp_char) + pattern = "[\\S\\s]{1," + str(limit) + '}' # 如果\n 单段超过限制,则继续拆分 - s = list(map(lambda row: filter_special_char(row) if with_filter else row, list( - reduce(lambda x, y: [*x, *y], list(map(lambda row: list(re.findall(pattern, row)), result)), [])))) - return s + return reduce(lambda x, y: [*x, *y], map(lambda row: re.findall(pattern, row), result), []) replace_map = { @@ -293,40 +296,24 @@ class SplitModel: :param index: 从那个正则开始解析 :return: 解析后的树形结果数据 """ - if len(self.content_level_pattern) == index: - return - level_content_list = parse_title_level(text, self.content_level_pattern, 0) + level_content_list = parse_title_level(text, self.content_level_pattern, index) + if len(level_content_list) == 0: + return list(map(lambda row: to_tree_obj(row, 'block'), post_handler_paragraph(text, limit=self.limit))) + if index == 0 and text.lstrip().index(level_content_list[0]["content"].lstrip()) != 0: + level_content_list.insert(0, to_tree_obj("")) + cursor = 0 for i in range(len(level_content_list)): block, cursor = get_level_block(text, level_content_list, i, cursor) - children = self.parse_to_tree(text=block, - index=index + 1) - if children is not None and len(children) > 0: - level_content_list[i]['children'] = children - else: - if len(block) > 0: - level_content_list[i]['children'] = list( - map(lambda row: to_tree_obj(row, 'block'), - post_handler_paragraph(block, with_filter=self.with_filter, limit=self.limit))) - if len(level_content_list) > 0: - end_index = text.index(level_content_list[0].get('content')) - if end_index == 0: - return level_content_list - other_content = text[0:end_index] - children = self.parse_to_tree(text=other_content, - index=index) - if len(children) > 0: - level_content_list = [*level_content_list, *children] - else: - if len(other_content.strip()) > 0: - level_content_list = [*level_content_list, *list( - map(lambda row: to_tree_obj(row, 'block'), - post_handler_paragraph(other_content, with_filter=self.with_filter, limit=self.limit)))] - else: - if len(text.strip()) > 0: - level_content_list = [*level_content_list, *list( - map(lambda row: to_tree_obj(row, 'block'), - post_handler_paragraph(text, with_filter=self.with_filter, limit=self.limit)))] + if len(block) == 0: + level_content_list[i]['children'] = [to_tree_obj("", "block")] + continue + children = self.parse_to_tree(text=block, index=index + 1) + level_content_list[i]['children'] = children + first_child_idx_in_block = block.lstrip().index(children[0]["content"].lstrip()) + if first_child_idx_in_block != 0: + inner_children = self.parse_to_tree(block[:first_child_idx_in_block], index + 1) + level_content_list[i]['children'].extend(inner_children) return level_content_list def parse(self, text: str): @@ -339,7 +326,7 @@ class SplitModel: text = text.replace('\r', '\n') text = text.replace("\0", '') result_tree = self.parse_to_tree(text, 0) - result = result_tree_to_paragraph(result_tree, [], []) + result = result_tree_to_paragraph(result_tree, [], [], self.with_filter) return [item for item in [self.post_reset_paragraph(row) for row in result] if 'content' in item and len(item.get('content').strip()) > 0]