MaxKB/apps/common/handle/impl/qa/xlsx_parse_qa_handle.py
shaohuzhang1 28938104c0
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run
* feat: 支持上传 Excel/CSV 类型的问答对 (#430)
2024-05-23 18:57:49 +08:00

57 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file xlsx_parse_qa_handle.py
@date2024/5/21 14:59
@desc:
"""
import io
import openpyxl
from common.handle.base_parse_qa_handle import BaseParseQAHandle
def handle_sheet(file_name, sheet):
rows = sheet.rows
try:
title_row_list = next(rows)
except Exception as e:
return None
title_row_index_dict = {}
for index in range(len(title_row_list)):
title_row = str(title_row_list[index].value)
if title_row.startswith('分段标题'):
title_row_index_dict['title'] = index
if title_row.startswith('分段内容'):
title_row_index_dict['content'] = index
if title_row.startswith('问题'):
title_row_index_dict['problem_list'] = index
paragraph_list = []
for row in rows:
problem = str(row[title_row_index_dict.get('problem_list')].value)
problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
paragraph_list.append({'title': str(row[title_row_index_dict.get('title')].value)[0:255],
'content': str(row[title_row_index_dict.get('content')].value)[0:4096],
'problem_list': problem_list})
return {'name': file_name, 'paragraphs': paragraph_list}
class XlsxParseQAHandle(BaseParseQAHandle):
def support(self, file, get_buffer):
file_name: str = file.name.lower()
if file_name.endswith(".xlsx"):
return True
return False
def handle(self, file, get_buffer):
buffer = get_buffer(file)
workbook = openpyxl.load_workbook(io.BytesIO(buffer))
worksheets = workbook.worksheets
worksheets_size = len(worksheets)
return [row for row in
[handle_sheet(file.name, sheet) if worksheets_size == 1 and sheet.title == 'Sheet1' else handle_sheet(
sheet.title, sheet) for sheet
in worksheets] if row is not None]