MaxKB/apps/common/handle/impl/table/xlsx_parse_table_handle.py
shaohuzhang1 35b662a52d
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run
perf: Optimize document extraction for complex table files (#3116)
2025-05-20 13:44:20 +08:00

108 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import io
import logging
from openpyxl import load_workbook
from common.handle.base_parse_table_handle import BaseParseTableHandle
from common.handle.impl.tools import xlsx_embed_cells_images
max_kb = logging.getLogger("max_kb")
class XlsxSplitHandle(BaseParseTableHandle):
def support(self, file, get_buffer):
file_name: str = file.name.lower()
if file_name.endswith('.xlsx'):
return True
return False
def fill_merged_cells(self, sheet, image_dict):
data = []
# 从第二行开始遍历每一行
for row in sheet.iter_rows(values_only=False):
row_data = []
for col_idx, cell in enumerate(row):
cell_value = cell.value
image = image_dict.get(cell_value, None)
if image is not None:
cell_value = f'![](/api/image/{image.id})'
# 使用标题作为键,单元格的值作为值存入字典
row_data.insert(col_idx, cell_value)
data.append(row_data)
for merged_range in sheet.merged_cells.ranges:
cell_value = data[merged_range.min_row - 1][merged_range.min_col - 1]
for row_index in range(merged_range.min_row, merged_range.max_row + 1):
for col_index in range(merged_range.min_col, merged_range.max_col + 1):
data[row_index - 1][col_index - 1] = cell_value
return data
def handle(self, file, get_buffer, save_image):
buffer = get_buffer(file)
try:
wb = load_workbook(io.BytesIO(buffer))
try:
image_dict: dict = xlsx_embed_cells_images(io.BytesIO(buffer))
save_image([item for item in image_dict.values()])
except Exception as e:
image_dict = {}
result = []
for sheetname in wb.sheetnames:
paragraphs = []
ws = wb[sheetname]
data = self.fill_merged_cells(ws, image_dict)
if len(data) >= 2:
head_list = data[0]
for row_index in range(1, len(data)):
row_output = "; ".join(
[f"{head_list[col_index]}: {data[row_index][col_index]}" for col_index in
range(0, len(data[row_index]))])
paragraphs.append({'title': '', 'content': row_output})
result.append({'name': sheetname, 'paragraphs': paragraphs})
except BaseException as e:
max_kb.error(f'excel split handle error: {e}')
return [{'name': file.name, 'paragraphs': []}]
return result
def get_content(self, file, save_image):
try:
# 加载 Excel 文件
workbook = load_workbook(file)
try:
image_dict: dict = xlsx_embed_cells_images(file)
if len(image_dict) > 0:
save_image(image_dict.values())
except Exception as e:
print(f'{e}')
image_dict = {}
md_tables = ''
# 如果未指定 sheet_name则使用第一个工作表
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname] if sheetname else workbook.active
data = self.fill_merged_cells(sheet, image_dict)
if len(data) == 0:
continue
# 提取表头和内容
headers = [f"{value}" for value in data[0]]
# 构建 Markdown 表格
md_table = '| ' + ' | '.join(headers) + ' |\n'
md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
for row_index in range(1, len(data)):
r = [f'{value}' for value in data[row_index]]
md_table += '| ' + ' | '.join(
[str(cell).replace('\n', '<br>') if cell is not None else '' for cell in r]) + ' |\n'
md_tables += md_table + '\n\n'
md_tables = md_tables.replace('/api/image/', '/api/file/')
return md_tables
except Exception as e:
max_kb.error(f'excel split handle error: {e}')
return f'error: {e}'