MaxKB/apps/common/handle/impl/qa/md_parse_qa_handle.py

141 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file md_parse_qa_handle.py
@date2024/5/21 14:59
@desc:
"""
import re
import traceback
from charset_normalizer import detect
from common.handle.base_parse_qa_handle import BaseParseQAHandle, get_title_row_index_dict, get_row_value
from common.utils.logger import maxkb_logger
class MarkdownParseQAHandle(BaseParseQAHandle):
def support(self, file, get_buffer):
file_name: str = file.name.lower()
if file_name.endswith(".md") or file_name.endswith(".markdown"):
return True
return False
def parse_markdown_table(self, content):
"""解析 Markdown 表格,返回表格数据列表"""
tables = []
lines = content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
# 检测表格开始(包含 | 符号)
if '|' in line and line.startswith('|'):
table_data = []
# 读取表头
header = [cell.strip() for cell in line.split('|')[1:-1]]
table_data.append(header)
i += 1
# 跳过分隔行 (例如: | --- | --- |)
if i < len(lines) and re.match(r'\s*\|[\s\-:]+\|\s*', lines[i]):
i += 1
# 读取数据行
while i < len(lines):
line = lines[i].strip()
if not line.startswith('|'):
break
row = [cell.strip() for cell in line.split('|')[1:-1]]
if len(row) > 0:
table_data.append(row)
i += 1
if len(table_data) > 1: # 至少有表头和一行数据
tables.append(table_data)
else:
i += 1
return tables
def handle(self, file, get_buffer, save_image):
buffer = get_buffer(file)
try:
# 检测编码并读取文件内容
encoding = detect(buffer)['encoding']
content = buffer.decode(encoding if encoding else 'utf-8')
# 按 sheet 分割内容
sheet_sections = self.split_by_sheets(content)
result = []
for sheet_name, sheet_content in sheet_sections:
# 解析该 sheet 的表格
tables = self.parse_markdown_table(sheet_content)
paragraph_list = []
# 处理每个表格
for table in tables:
if len(table) < 2:
continue
title_row_list = table[0]
title_row_index_dict = get_title_row_index_dict(title_row_list)
# 处理表格的每一行数据
for row in table[1:]:
content_text = get_row_value(row, title_row_index_dict, 'content')
if content_text is None:
continue
problem = get_row_value(row, title_row_index_dict, 'problem_list')
problem = str(problem) if problem is not None else ''
problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
title = get_row_value(row, title_row_index_dict, 'title')
title = str(title) if title is not None else ''
paragraph_list.append({
'title': title[0:255],
'content': content_text[0:102400],
'problem_list': problem_list
})
result.append({'name': sheet_name, 'paragraphs': paragraph_list})
return result if result else [{'name': file.name, 'paragraphs': []}]
except Exception as e:
maxkb_logger.error(f"Error processing Markdown file {file.name}: {e}, {traceback.format_exc()}")
return [{'name': file.name, 'paragraphs': []}]
def split_by_sheets(self, content):
"""按二级标题(##)分割 sheet"""
lines = content.split('\n')
sheets = []
current_sheet_name = None
current_content = []
for line in lines:
# 检测二级标题作为 sheet 名称
if line.strip().startswith('## '):
if current_sheet_name is not None:
sheets.append((current_sheet_name, '\n'.join(current_content)))
current_sheet_name = line.strip()[3:].strip()
current_content = []
else:
current_content.append(line)
# 添加最后一个 sheet
if current_sheet_name is not None:
sheets.append((current_sheet_name, '\n'.join(current_content)))
# 如果没有找到 sheet 标题,返回整个内容
if not sheets:
sheets.append(('default', content))
return sheets