MaxKB/apps/common/handle/impl/text/csv_split_handle.py

78 lines
2.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file csv_parse_qa_handle.py
@date2024/5/21 14:59
@desc:
"""
import csv
import io
import os
import traceback
from typing import List
from charset_normalizer import detect
from common.handle.base_split_handle import BaseSplitHandle
from common.utils.logger import maxkb_logger
def post_cell(cell_value):
return cell_value.replace('\n', '<br>').replace('|', '&#124;')
def row_to_md(row):
return '| ' + ' | '.join(
[post_cell(cell) if cell is not None else '' for cell in row]) + ' |\n'
class CsvSplitHandle(BaseSplitHandle):
def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
buffer = get_buffer(file)
paragraphs = []
file_name = os.path.basename(file.name)
result = {'name': file_name, 'content': paragraphs}
try:
if type(limit) is str:
limit = int(limit)
reader = csv.reader(io.TextIOWrapper(io.BytesIO(buffer), encoding=detect(buffer)['encoding']))
try:
title_row_list = reader.__next__()
title_md_content = row_to_md(title_row_list)
title_md_content += '| ' + ' | '.join(
['---' if cell is not None else '' for cell in title_row_list]) + ' |\n'
except Exception as e:
return result
if len(title_row_list) == 0:
return result
result_item_content = ''
for row in reader:
next_md_content = row_to_md(row)
next_md_content_len = len(next_md_content)
result_item_content_len = len(result_item_content)
if len(result_item_content) == 0:
result_item_content += title_md_content
result_item_content += next_md_content
else:
if result_item_content_len + next_md_content_len < limit:
result_item_content += next_md_content
else:
paragraphs.append({'content': result_item_content, 'title': ''})
result_item_content = title_md_content + next_md_content
if len(result_item_content) > 0:
paragraphs.append({'content': result_item_content, 'title': ''})
return result
except Exception as e:
maxkb_logger.error(f"Error processing CSV file {file.name}: {e}, {traceback.format_exc()}")
return result
def get_content(self, file, save_image):
pass
def support(self, file, get_buffer):
file_name: str = file.name.lower()
if file_name.endswith(".csv"):
return True
return False