mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 18:32:48 +00:00
172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
# coding=utf-8
|
||
"""
|
||
@project: maxkb
|
||
@Author:虎
|
||
@file: xlsx_parse_qa_handle.py
|
||
@date:2024/5/21 14:59
|
||
@desc:
|
||
"""
|
||
import io
|
||
import traceback
|
||
from typing import List
|
||
|
||
import openpyxl
|
||
from openpyxl import load_workbook
|
||
|
||
from common.handle.base_split_handle import BaseSplitHandle
|
||
from common.handle.impl.common_handle import xlsx_embed_cells_images
|
||
from common.utils.logger import maxkb_logger
|
||
|
||
splitter = '\n`-----------------------------------`\n'
|
||
|
||
|
||
def post_cell(image_dict, cell_value):
|
||
image = image_dict.get(cell_value, None)
|
||
if image is not None:
|
||
return f''
|
||
return cell_value.replace('\n', '<br>').replace('|', '|')
|
||
|
||
|
||
def row_to_md(row, image_dict):
|
||
return '| ' + ' | '.join(
|
||
[post_cell(image_dict, str(cell.value if cell.value is not None else '')) if cell is not None else '' for cell
|
||
in row]) + ' |\n'
|
||
|
||
|
||
def handle_sheet(file_name, sheet, image_dict, limit: int):
|
||
rows = sheet.rows
|
||
paragraphs = []
|
||
result = {'name': file_name, 'content': paragraphs}
|
||
try:
|
||
title_row_list = next(rows)
|
||
title_md_content = row_to_md(title_row_list, image_dict)
|
||
title_md_content += '| ' + ' | '.join(
|
||
['---' if cell is not None else '' for cell in title_row_list]) + ' |\n'
|
||
except Exception as e:
|
||
return result
|
||
if len(title_row_list) == 0:
|
||
return result
|
||
result_item_content = ''
|
||
for row in rows:
|
||
next_md_content = row_to_md(row, image_dict)
|
||
next_md_content_len = len(next_md_content)
|
||
result_item_content_len = len(result_item_content)
|
||
if len(result_item_content) == 0:
|
||
result_item_content += title_md_content
|
||
result_item_content += next_md_content
|
||
else:
|
||
if result_item_content_len + next_md_content_len < limit:
|
||
result_item_content += next_md_content
|
||
else:
|
||
paragraphs.append({'content': result_item_content, 'title': ''})
|
||
result_item_content = title_md_content + next_md_content
|
||
if len(result_item_content) > 0:
|
||
paragraphs.append({'content': result_item_content, 'title': ''})
|
||
return result
|
||
|
||
|
||
class XlsxSplitHandle(BaseSplitHandle):
|
||
def fill_merged_cells(self, sheet, image_dict):
|
||
data = []
|
||
|
||
# 获取第一行作为标题行
|
||
headers = []
|
||
for idx, cell in enumerate(sheet[1]):
|
||
if cell.value is None:
|
||
headers.append(' ' * (idx + 1))
|
||
else:
|
||
headers.append(cell.value)
|
||
|
||
# 从第二行开始遍历每一行
|
||
for row in sheet.iter_rows(min_row=2, values_only=False):
|
||
row_data = {}
|
||
for col_idx, cell in enumerate(row):
|
||
cell_value = cell.value
|
||
|
||
# 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
|
||
if cell_value is None:
|
||
for merged_range in sheet.merged_cells.ranges:
|
||
if cell.coordinate in merged_range:
|
||
cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
|
||
break
|
||
|
||
image = image_dict.get(cell_value, None)
|
||
if image is not None:
|
||
cell_value = f''
|
||
|
||
# 使用标题作为键,单元格的值作为值存入字典
|
||
row_data[headers[col_idx]] = cell_value
|
||
data.append(row_data)
|
||
|
||
return data
|
||
|
||
def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
|
||
buffer = get_buffer(file)
|
||
try:
|
||
if type(limit) is str:
|
||
limit = int(limit)
|
||
workbook = openpyxl.load_workbook(io.BytesIO(buffer))
|
||
try:
|
||
image_dict: dict = xlsx_embed_cells_images(io.BytesIO(buffer))
|
||
save_image([item for item in image_dict.values()])
|
||
except Exception as e:
|
||
image_dict = {}
|
||
worksheets = workbook.worksheets
|
||
worksheets_size = len(worksheets)
|
||
return [row for row in
|
||
[handle_sheet(file.name,
|
||
sheet,
|
||
image_dict,
|
||
limit) if worksheets_size == 1 and sheet.title == 'Sheet1' else handle_sheet(
|
||
sheet.title, sheet, image_dict, limit) for sheet
|
||
in worksheets] if row is not None]
|
||
except Exception as e:
|
||
maxkb_logger.error(f"Error processing XLSX file {file.name}: {e}, {traceback.format_exc()}")
|
||
return [{'name': file.name, 'content': []}]
|
||
|
||
def get_content(self, file, save_image):
|
||
try:
|
||
# 加载 Excel 文件
|
||
workbook = load_workbook(file)
|
||
try:
|
||
image_dict: dict = xlsx_embed_cells_images(file)
|
||
if len(image_dict) > 0:
|
||
save_image(image_dict.values())
|
||
except Exception as e:
|
||
maxkb_logger.error(f'Exception: {e}')
|
||
image_dict = {}
|
||
md_tables = ''
|
||
# 遍历所有工作表
|
||
for sheetname in workbook.sheetnames:
|
||
sheet = workbook[sheetname]
|
||
rows = self.fill_merged_cells(sheet, image_dict)
|
||
if len(rows) == 0:
|
||
continue
|
||
|
||
# 添加 sheet 名称作为标题
|
||
md_tables += f'## {sheetname}\n\n'
|
||
|
||
# 提取表头和内容
|
||
headers = [f"{key}" for key, value in rows[0].items()]
|
||
|
||
# 构建 Markdown 表格
|
||
md_table = '| ' + ' | '.join(headers) + ' |\n'
|
||
md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
|
||
for row in rows:
|
||
r = [f'{value}' if value is not None else '' for key, value in row.items()]
|
||
md_table += '| ' + ' | '.join(
|
||
[str(cell).replace('\n', '<br>') if cell is not None else '' for cell in r]) + ' |\n'
|
||
|
||
md_tables += md_table + '\n\n'
|
||
|
||
return md_tables
|
||
except Exception as e:
|
||
maxkb_logger.error(f'excel split handle error: {e}')
|
||
return f'error: {e}'
|
||
|
||
def support(self, file, get_buffer):
|
||
file_name: str = file.name.lower()
|
||
if file_name.endswith(".xlsx"):
|
||
return True
|
||
return False
|