# coding=utf-8 """ @project: MaxKB @Author:虎 @file: tools.py @date:2024/9/11 16:41 @desc: """ import io import traceback from functools import reduce from io import BytesIO from xml.etree.ElementTree import fromstring from zipfile import ZipFile import uuid_utils.compat as uuid from PIL import Image as PILImage from openpyxl.drawing.image import Image as openpyxl_Image from openpyxl.packaging.relationship import get_rels_path, get_dependents from openpyxl.xml.constants import SHEET_DRAWING_NS, REL_NS, SHEET_MAIN_NS from common.utils.logger import maxkb_logger from knowledge.models import File from PIL import ImageFile ImageFile.LOAD_TRUNCATED_IMAGES = True PILImage.MAX_IMAGE_PIXELS = None def parse_element(element) -> {}: data = {} xdr_namespace = "{%s}" % SHEET_DRAWING_NS targets = level_order_traversal(element, xdr_namespace + "nvPicPr") for target in targets: cNvPr = embed = "" for child in target: if child.tag == xdr_namespace + "nvPicPr": cNvPr = child[0].attrib["name"] elif child.tag == xdr_namespace + "blipFill": _rel_embed = "{%s}embed" % REL_NS embed = child[0].attrib[_rel_embed] if cNvPr: data[cNvPr] = embed return data def parse_element_sheet_xml(element) -> []: data = [] xdr_namespace = "{%s}" % SHEET_MAIN_NS targets = level_order_traversal(element, xdr_namespace + "f") for target in targets: for child in target: if child.tag == xdr_namespace + "f": data.append(child.text) return data def level_order_traversal(root, flag: str) -> []: queue = [root] targets = [] while queue: node = queue.pop(0) children = [child.tag for child in node] if flag in children: targets.append(node) continue for child in node: queue.append(child) return targets def handle_images(deps, archive: ZipFile) -> []: images = [] if not PILImage: # Pillow not installed, drop images return images for dep in deps: try: image_io = archive.read(dep.target) image = openpyxl_Image(BytesIO(image_io)) except Exception as e: maxkb_logger.error(f"Error reading image {dep.target}: {e}, {traceback.format_exc()}") continue image.embed = dep.id # 文件rId image.target = dep.target # 文件地址 images.append(image) return images def xlsx_embed_cells_images(buffer) -> {}: archive = ZipFile(buffer) # 解析cellImage.xml文件 deps = get_dependents(archive, get_rels_path("xl/cellimages.xml")) image_rel = handle_images(deps=deps, archive=archive) # 工作表及其中图片ID sheet_list = {} for item in archive.namelist(): if not item.startswith('xl/worksheets/sheet'): continue key = item.split('/')[-1].split('.')[0].split('sheet')[-1] sheet_list[key] = parse_element_sheet_xml(fromstring(archive.read(item))) cell_images_xml = parse_element(fromstring(archive.read("xl/cellimages.xml"))) cell_images_rel = {} for image in image_rel: cell_images_rel[image.embed] = image for cnv, embed in cell_images_xml.items(): cell_images_xml[cnv] = cell_images_rel.get(embed) result = {} for key, img in cell_images_xml.items(): all_cells = [ cell for _sheet_id, sheet in sheet_list.items() if sheet is not None for cell in sheet or [] ] image_excel_id_list = [ cell for cell in all_cells if isinstance(cell, str) and key in cell ] # print(key, img) if img is None: continue if len(image_excel_id_list) > 0: image_excel_id = image_excel_id_list[-1] f = archive.open(img.target) img_byte = io.BytesIO() im = PILImage.open(f).convert('RGB') im.save(img_byte, format='JPEG') image = File(id=uuid.uuid7(), file_name=img.path, meta={'debug': False, 'content': img_byte.getvalue()}) result['=' + image_excel_id] = image archive.close() return result