# coding=utf-8 """ @project: maxkb @Author:虎 @file: text_split_handle.py @date:2024/3/27 18:19 @desc: """ import os import re import tempfile import time import traceback from typing import List import fitz from django.utils.translation import gettext_lazy as _ from langchain_community.document_loaders import PyPDFLoader from common.handle.base_split_handle import BaseSplitHandle from common.utils.logger import maxkb_logger from common.utils.split_model import SplitModel, smart_split_paragraph default_pattern_list = [re.compile('(?<=^)# .*|(?<=\\n)# .*'), re.compile('(?<=\\n)(? 0: return {'name': file.name, 'content': result} # 没有目录的pdf content = self.handle_pdf_content(file, pdf_document) if pattern_list is not None and len(pattern_list) > 0: split_model = SplitModel(pattern_list, with_filter, limit) else: split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit) except BaseException as e: maxkb_logger.error(f"File: {file.name}, error: {e}, {traceback.format_exc()}") return { 'name': file.name, 'content': [] } finally: pdf_document.close() # 处理完后可以删除临时文件 os.remove(temp_file_path) return { 'name': file.name, 'content': split_model.parse(content) } @staticmethod def handle_pdf_content(file, pdf_document): # 第一步:收集所有字体大小 font_sizes = [] for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) blocks = page.get_text("dict")["blocks"] for block in blocks: if block["type"] == 0: for line in block["lines"]: for span in line["spans"]: if span["size"] > 0: font_sizes.append(span["size"]) # 计算正文字体大小(众数) if not font_sizes: body_font_size = 12 else: from collections import Counter body_font_size = Counter(font_sizes).most_common(1)[0][0] # 第二步:提取内容 content = "" for page_num in range(len(pdf_document)): start_time = time.time() page = pdf_document.load_page(page_num) blocks = page.get_text("dict")["blocks"] for block in blocks: if block["type"] == 0: # 文本块 for line in block["lines"]: if not line["spans"]: continue text = "".join([span["text"] for span in line["spans"]]) font_size = line["spans"][0]["size"] # 根据与正文字体的差值判断 size_diff = font_size - body_font_size if size_diff > 2: # 明显大于正文 content += f"## {text}\n\n" elif size_diff > 0.5: # 略大于正文 content += f"### {text}\n\n" else: # 正文 content += f"{text}\n" elif block["type"] == 1: # 图片块 content += f"![image](image_{page_num}_{block['number']})\n\n" content = content.replace('\0', '') elapsed_time = time.time() - start_time maxkb_logger.debug( f"File: {file.name}, Page: {page_num + 1}, Time: {elapsed_time:.3f}s") return content @staticmethod def handle_toc(doc, limit): # 找到目录 toc = doc.get_toc() if toc is None or len(toc) == 0: return None # 创建存储章节内容的数组 chapters = [] # 遍历目录并按章节提取文本 for i, entry in enumerate(toc): level, title, start_page = entry start_page -= 1 # PyMuPDF 页码从 0 开始,书签页码从 1 开始 chapter_title = title # 确定结束页码,如果是最后一个章节则到文档末尾 if i + 1 < len(toc): end_page = toc[i + 1][2] - 1 else: end_page = doc.page_count - 1 # 去掉标题中的符号 title = PdfSplitHandle.handle_chapter_title(title) # 提取该章节的文本内容 chapter_text = "" for page_num in range(start_page, end_page + 1): page = doc.load_page(page_num) # 加载页面 text = page.get_text("text") text = re.sub(r'(? -1: text = text[idx + len(title):] if i + 1 < len(toc): l, next_title, next_start_page = toc[i + 1] next_title = PdfSplitHandle.handle_chapter_title(next_title) # print(f'next_title: {next_title}') idx = text.find(next_title) if idx > -1: text = text[:idx] chapter_text += text # 提取文本 # Null characters are not allowed. chapter_text = chapter_text.replace('\0', '') # 限制标题长度 real_chapter_title = chapter_title[:256] # 限制章节内容长度 if 0 < limit < len(chapter_text): split_text = smart_split_paragraph(chapter_text, limit) for text in split_text: chapters.append({"title": real_chapter_title, "content": text}) else: chapters.append( {"title": real_chapter_title, "content": chapter_text if chapter_text else real_chapter_title}) # 保存章节内容和章节标题 return chapters @staticmethod def handle_links(doc, pattern_list, with_filter, limit): # 检查文档是否包含内部链接 if not check_links_in_pdf(doc): return # 创建存储章节内容的数组 chapters = [] toc_start_page = -1 page_content = "" handle_pre_toc = True # 遍历 PDF 的每一页,查找带有目录链接的页 for page_num in range(doc.page_count): page = doc.load_page(page_num) links = page.get_links() # 如果目录开始页码未设置,则设置为当前页码 if len(links) > 0: toc_start_page = page_num if toc_start_page < 0: page_content += page.get_text('text') # 检查该页是否包含内部链接(即指向文档内部的页面) for num in range(len(links)): link = links[num] if link['kind'] == 1: # 'kind' 为 1 表示内部链接 # 获取链接目标的页面 dest_page = link['page'] rect = link['from'] # 获取链接的矩形区域 # 如果目录开始页码包括前言部分,则不处理前言部分 if dest_page < toc_start_page: handle_pre_toc = False # 提取链接区域的文本作为标题 link_title = page.get_text("text", clip=rect).strip().split("\n")[0].replace('.', '').strip() # print(f'link_title: {link_title}') # 提取目标页面内容作为章节开始 start_page = dest_page end_page = dest_page # 下一个link next_link = links[num + 1] if num + 1 < len(links) else None next_link_title = None if next_link is not None and next_link['kind'] == 1: rect = next_link['from'] next_link_title = page.get_text("text", clip=rect).strip() \ .split("\n")[0].replace('.', '').strip() # print(f'next_link_title: {next_link_title}') end_page = next_link['page'] # 提取章节内容 chapter_text = "" for p_num in range(start_page, end_page + 1): p = doc.load_page(p_num) text = p.get_text("text") text = re.sub(r'(? -1: text = text[idx + len(link_title):] if next_link_title is not None: idx = text.find(next_link_title) if idx > -1: text = text[:idx] chapter_text += text # Null characters are not allowed. chapter_text = chapter_text.replace('\0', '') # 限制章节内容长度 if 0 < limit < len(chapter_text): split_text = smart_split_paragraph(chapter_text, limit) for text in split_text: chapters.append({"title": link_title, "content": text}) else: # 保存章节信息 chapters.append({"title": link_title, "content": chapter_text}) # 目录中没有前言部分,手动处理 if handle_pre_toc: pre_toc = [] lines = page_content.strip().split('\n') try: for line in lines: if re.match(r'^前\s*言', line): pre_toc.append({'title': line, 'content': ''}) else: pre_toc[-1]['content'] += line for i in range(len(pre_toc)): pre_toc[i]['content'] = re.sub(r'(? 0: split_model = SplitModel(pattern_list, with_filter, limit) else: split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit) # 插入目录前的部分 page_content = re.sub(r'(?