feat: enhance Markdown parsing by splitting content into sheets and adding sheet titles

This commit is contained in:
CaptainB 2025-12-04 17:39:58 +08:00
parent 8fd568dd97
commit 2ce4f9af92
4 changed files with 73 additions and 34 deletions

View File

@ -110,7 +110,7 @@ class BaseDocumentSplitNode(IDocumentSplitNode):
'source_file_id': source_file_id,
'source_url': file_name,
}
item['name'] = file_name
item['name'] = item.get('name', file_name)
item['source_file_id'] = source_file_id
item['paragraphs'] = item.pop('content', item.get('paragraphs', []))

View File

@ -66,43 +66,75 @@ class MarkdownParseQAHandle(BaseParseQAHandle):
encoding = detect(buffer)['encoding']
content = buffer.decode(encoding if encoding else 'utf-8')
# 解析 Markdown 表格
tables = self.parse_markdown_table(content)
# 按 sheet 分割内容
sheet_sections = self.split_by_sheets(content)
if not tables:
return [{'name': file.name, 'paragraphs': []}]
result = []
paragraph_list = []
for sheet_name, sheet_content in sheet_sections:
# 解析该 sheet 的表格
tables = self.parse_markdown_table(sheet_content)
# 处理每个表格
for table in tables:
if len(table) < 2:
continue
paragraph_list = []
title_row_list = table[0]
title_row_index_dict = get_title_row_index_dict(title_row_list)
# 处理表格的每一行数据
for row in table[1:]:
content = get_row_value(row, title_row_index_dict, 'content')
if content is None:
# 处理每个表格
for table in tables:
if len(table) < 2:
continue
problem = get_row_value(row, title_row_index_dict, 'problem_list')
problem = str(problem) if problem is not None else ''
problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
title_row_list = table[0]
title_row_index_dict = get_title_row_index_dict(title_row_list)
title = get_row_value(row, title_row_index_dict, 'title')
title = str(title) if title is not None else ''
# 处理表格的每一行数据
for row in table[1:]:
content_text = get_row_value(row, title_row_index_dict, 'content')
if content_text is None:
continue
paragraph_list.append({
'title': title[0:255],
'content': content[0:102400],
'problem_list': problem_list
})
problem = get_row_value(row, title_row_index_dict, 'problem_list')
problem = str(problem) if problem is not None else ''
problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
return [{'name': file.name, 'paragraphs': paragraph_list}]
title = get_row_value(row, title_row_index_dict, 'title')
title = str(title) if title is not None else ''
paragraph_list.append({
'title': title[0:255],
'content': content_text[0:102400],
'problem_list': problem_list
})
result.append({'name': sheet_name, 'paragraphs': paragraph_list})
return result if result else [{'name': file.name, 'paragraphs': []}]
except Exception as e:
maxkb_logger.error(f"Error processing Markdown file {file.name}: {e}, {traceback.format_exc()}")
return [{'name': file.name, 'paragraphs': []}]
def split_by_sheets(self, content):
"""按二级标题(##)分割 sheet"""
lines = content.split('\n')
sheets = []
current_sheet_name = None
current_content = []
for line in lines:
# 检测二级标题作为 sheet 名称
if line.strip().startswith('## '):
if current_sheet_name is not None:
sheets.append((current_sheet_name, '\n'.join(current_content)))
current_sheet_name = line.strip()[3:].strip()
current_content = []
else:
current_content.append(line)
# 添加最后一个 sheet
if current_sheet_name is not None:
sheets.append((current_sheet_name, '\n'.join(current_content)))
# 如果没有找到 sheet 标题,返回整个内容
if not sheets:
sheets.append(('default', content))
return sheets

View File

@ -90,14 +90,17 @@ class XlsxParseTableHandle(BaseParseTableHandle):
maxkb_logger.error(f'Exception: {e}')
image_dict = {}
md_tables = ''
# 如果未指定 sheet_name则使用第一个工作表
# 遍历所有工作表
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname] if sheetname else workbook.active
sheet = workbook[sheetname]
rows = self.fill_merged_cells(sheet, image_dict)
if len(rows) == 0:
continue
# 提取表头和内容
# 添加 sheet 名称作为标题
md_tables += f'## {sheetname}\n\n'
# 提取表头和内容
headers = [f"{key}" for key, value in rows[0].items()]
# 构建 Markdown 表格

View File

@ -17,6 +17,7 @@ from common.handle.base_split_handle import BaseSplitHandle
from common.handle.impl.common_handle import xlsx_embed_cells_images
from common.utils.logger import maxkb_logger
splitter = '\n`-----------------------------------`\n'
def post_cell(image_dict, cell_value):
image = image_dict.get(cell_value, None)
@ -134,14 +135,17 @@ class XlsxSplitHandle(BaseSplitHandle):
maxkb_logger.error(f'Exception: {e}')
image_dict = {}
md_tables = ''
# 如果未指定 sheet_name则使用第一个工作表
# 遍历所有工作表
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname] if sheetname else workbook.active
sheet = workbook[sheetname]
rows = self.fill_merged_cells(sheet, image_dict)
if len(rows) == 0:
continue
# 提取表头和内容
# 添加 sheet 名称作为标题
md_tables += f'## {sheetname}\n\n'
# 提取表头和内容
headers = [f"{key}" for key, value in rows[0].items()]
# 构建 Markdown 表格