feat: implement CSV, XLS, and XLSX content extraction with Markdown formatting

This commit is contained in:
CaptainB 2025-12-04 14:29:06 +08:00
parent c4dd09ca1e
commit a197cfc69d
4 changed files with 198 additions and 4 deletions

View File

@ -68,7 +68,38 @@ class CsvSplitHandle(BaseSplitHandle):
return result
def get_content(self, file, save_image):
pass
buffer = file.read()
try:
reader = csv.reader(io.TextIOWrapper(io.BytesIO(buffer), encoding=detect(buffer)['encoding']))
rows = list(reader)
if not rows:
return ""
# 构建 Markdown 表格
md_lines = []
# 添加表头
header = [cell.replace('\n', '<br>').replace('\r', '') for cell in rows[0]]
md_lines.append('| ' + ' | '.join(header) + ' |')
# 添加分隔线
md_lines.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
# 添加数据行
for row in rows[1:]:
if row: # 跳过空行
# 确保行长度与表头一致,并将换行符转换为 <br>
padded_row = [
cell.replace('\n', '<br>').replace('\r', '') for cell in row
] + [''] * (len(header) - len(row))
md_lines.append('| ' + ' | '.join(padded_row[:len(header)]) + ' |')
return '\n'.join(md_lines)
except Exception as e:
maxkb_logger.error(f"Error processing CSV file {file.name}: {e}, {traceback.format_exc()}")
return ""
def support(self, file, get_buffer):
file_name: str = file.name.lower()

View File

@ -75,7 +75,36 @@ class XlsSplitHandle(BaseSplitHandle):
return [{'name': file.name, 'content': []}]
def get_content(self, file, save_image):
pass
# 打开 .xls 文件
try:
workbook = xlrd.open_workbook(file_contents=file.read(), formatting_info=True)
sheets = workbook.sheets()
md_tables = ''
for sheet in sheets:
# 过滤空白的sheet
if sheet.nrows == 0 or sheet.ncols == 0:
continue
# 获取表头和内容
headers = sheet.row_values(0)
data = [sheet.row_values(row_idx) for row_idx in range(1, sheet.nrows)]
# 构建 Markdown 表格
md_table = '| ' + ' | '.join(headers) + ' |\n'
md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
for row in data:
# 将每个单元格中的内容替换换行符为 <br> 以保留原始格式
md_table += '| ' + ' | '.join(
[str(cell)
.replace('\r\n', '<br>')
.replace('\n', '<br>')
if cell else '' for cell in row]) + ' |\n'
md_tables += md_table + '\n\n'
return md_tables
except Exception as e:
maxkb_logger.error(f'excel split handle error: {e}')
return f'error: {e}'
def support(self, file, get_buffer):
file_name: str = file.name.lower()

View File

@ -11,6 +11,7 @@ import traceback
from typing import List
import openpyxl
from openpyxl import load_workbook
from common.handle.base_split_handle import BaseSplitHandle
from common.handle.impl.common_handle import xlsx_embed_cells_images
@ -63,6 +64,40 @@ def handle_sheet(file_name, sheet, image_dict, limit: int):
class XlsxSplitHandle(BaseSplitHandle):
def fill_merged_cells(self, sheet, image_dict):
data = []
# 获取第一行作为标题行
headers = []
for idx, cell in enumerate(sheet[1]):
if cell.value is None:
headers.append(' ' * (idx + 1))
else:
headers.append(cell.value)
# 从第二行开始遍历每一行
for row in sheet.iter_rows(min_row=2, values_only=False):
row_data = {}
for col_idx, cell in enumerate(row):
cell_value = cell.value
# 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
if cell_value is None:
for merged_range in sheet.merged_cells.ranges:
if cell.coordinate in merged_range:
cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
break
image = image_dict.get(cell_value, None)
if image is not None:
cell_value = f'![](./oss/file/{image.id})'
# 使用标题作为键,单元格的值作为值存入字典
row_data[headers[col_idx]] = cell_value
data.append(row_data)
return data
def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
buffer = get_buffer(file)
try:
@ -88,7 +123,41 @@ class XlsxSplitHandle(BaseSplitHandle):
return [{'name': file.name, 'content': []}]
def get_content(self, file, save_image):
pass
try:
# 加载 Excel 文件
workbook = load_workbook(file)
try:
image_dict: dict = xlsx_embed_cells_images(file)
if len(image_dict) > 0:
save_image(image_dict.values())
except Exception as e:
maxkb_logger.error(f'Exception: {e}')
image_dict = {}
md_tables = ''
# 如果未指定 sheet_name则使用第一个工作表
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname] if sheetname else workbook.active
rows = self.fill_merged_cells(sheet, image_dict)
if len(rows) == 0:
continue
# 提取表头和内容
headers = [f"{key}" for key, value in rows[0].items()]
# 构建 Markdown 表格
md_table = '| ' + ' | '.join(headers) + ' |\n'
md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
for row in rows:
r = [f'{value}' for key, value in row.items()]
md_table += '| ' + ' | '.join(
[str(cell).replace('\n', '<br>') if cell is not None else '' for cell in r]) + ' |\n'
md_tables += md_table + '\n\n'
return md_tables
except Exception as e:
maxkb_logger.error(f'excel split handle error: {e}')
return f'error: {e}'
def support(self, file, get_buffer):
file_name: str = file.name.lower()

View File

@ -165,4 +165,69 @@ class ZipSplitHandle(BaseSplitHandle):
return False
def get_content(self, file, save_image):
return ""
"""
zip 中提取并返回拼接的 md 文本同时收集并保存内嵌图片通过 save_image 回调
"""
buffer = file.read() if hasattr(file, 'read') else None
bytes_io = io.BytesIO(buffer) if buffer is not None else io.BytesIO(file)
md_parts = []
image_mode_list = []
def is_image_name(name: str):
ext = os.path.splitext(name.lower())[1]
return ext in ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.svg')
with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
files = zip_ref.namelist()
for inner_name in files:
if inner_name.endswith('/') or inner_name.startswith('__MACOSX'):
continue
with zip_ref.open(inner_name) as zf:
try:
real_name = get_file_name(zf.name)
except Exception:
real_name = zf.name
raw = zf.read()
# 图片直接收集
if is_image_name(real_name):
image_id = str(uuid.uuid7())
fmodel = File(
id=image_id,
file_name=os.path.basename(real_name),
meta={'debug': False, 'content': raw}
)
image_mode_list.append(fmodel)
# 在 md 中不直接插入二进制,保存后上层可替换引用
continue
# 为 split_handle 提供可重复读取的 file-like 对象
inner_file = io.BytesIO(raw)
inner_file.name = real_name
# 尝试使用已注册的 split handle 的 get_content
md_text = None
for split_handle in split_handles:
# 准备一个简单的 get_buffer 回调,返回当前 raw
get_buffer = lambda f, _raw=raw: _raw
if split_handle.support(inner_file, get_buffer):
# 回到文件头
inner_file.seek(0)
md_text = split_handle.get_content(inner_file, save_image)
break
# 如果没有任何 split_handle 处理,按文本解码作为后备
if md_text is None:
enc = detect(raw).get('encoding') or 'utf-8'
try:
md_text = raw.decode(enc, errors='ignore')
except Exception:
md_text = raw.decode('utf-8', errors='ignore')
if isinstance(md_text, str) and md_text.strip():
md_parts.append(md_text)
# 将收集到的图片通过回调保存
if image_mode_list:
save_image(image_mode_list)
return '\n\n'.join(md_parts)