MaxKB/apps/common/handle/impl/common_handle.py

120 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: MaxKB
@Author
@file tools.py
@date2024/9/11 16:41
@desc:
"""
import io
import logging
import uuid_utils.compat as uuid
from functools import reduce
from io import BytesIO
from xml.etree.ElementTree import fromstring
from zipfile import ZipFile
from PIL import Image as PILImage
from openpyxl.drawing.image import Image as openpyxl_Image
from openpyxl.packaging.relationship import get_rels_path, get_dependents
from openpyxl.xml.constants import SHEET_DRAWING_NS, REL_NS, SHEET_MAIN_NS
from common.handle.base_parse_qa_handle import get_title_row_index_dict, get_row_value
from common.utils.logger import maxkb_logger
from knowledge.models import File
def parse_element(element) -> {}:
data = {}
xdr_namespace = "{%s}" % SHEET_DRAWING_NS
targets = level_order_traversal(element, xdr_namespace + "nvPicPr")
for target in targets:
cNvPr = embed = ""
for child in target:
if child.tag == xdr_namespace + "nvPicPr":
cNvPr = child[0].attrib["name"]
elif child.tag == xdr_namespace + "blipFill":
_rel_embed = "{%s}embed" % REL_NS
embed = child[0].attrib[_rel_embed]
if cNvPr:
data[cNvPr] = embed
return data
def parse_element_sheet_xml(element) -> []:
data = []
xdr_namespace = "{%s}" % SHEET_MAIN_NS
targets = level_order_traversal(element, xdr_namespace + "f")
for target in targets:
for child in target:
if child.tag == xdr_namespace + "f":
data.append(child.text)
return data
def level_order_traversal(root, flag: str) -> []:
queue = [root]
targets = []
while queue:
node = queue.pop(0)
children = [child.tag for child in node]
if flag in children:
targets.append(node)
continue
for child in node:
queue.append(child)
return targets
def handle_images(deps, archive: ZipFile) -> []:
images = []
if not PILImage: # Pillow not installed, drop images
return images
for dep in deps:
try:
image_io = archive.read(dep.target)
image = openpyxl_Image(BytesIO(image_io))
except Exception as e:
maxkb_logger.error(f"Error reading image {dep.target}: {e}")
continue
image.embed = dep.id # 文件rId
image.target = dep.target # 文件地址
images.append(image)
return images
def xlsx_embed_cells_images(buffer) -> {}:
archive = ZipFile(buffer)
# 解析cellImage.xml文件
deps = get_dependents(archive, get_rels_path("xl/cellimages.xml"))
image_rel = handle_images(deps=deps, archive=archive)
# 工作表及其中图片ID
sheet_list = {}
for item in archive.namelist():
if not item.startswith('xl/worksheets/sheet'):
continue
key = item.split('/')[-1].split('.')[0].split('sheet')[-1]
sheet_list[key] = parse_element_sheet_xml(fromstring(archive.read(item)))
cell_images_xml = parse_element(fromstring(archive.read("xl/cellimages.xml")))
cell_images_rel = {}
for image in image_rel:
cell_images_rel[image.embed] = image
for cnv, embed in cell_images_xml.items():
cell_images_xml[cnv] = cell_images_rel.get(embed)
result = {}
for key, img in cell_images_xml.items():
image_excel_id_list = [_xl for _xl in
reduce(lambda x, y: [*x, *y], [sheet for sheet_id, sheet in sheet_list.items()], []) if
key in _xl]
if len(image_excel_id_list) > 0:
image_excel_id = image_excel_id_list[-1]
f = archive.open(img.target)
img_byte = io.BytesIO()
im = PILImage.open(f).convert('RGB')
im.save(img_byte, format='JPEG')
image = File(id=uuid.uuid7(), file_name=img.path, meta={'debug': False, 'content': img_byte.getvalue()})
result['=' + image_excel_id] = image
archive.close()
return result