From 32c18ab5aeef832cf7a96144d227f2f4db9a6da4 Mon Sep 17 00:00:00 2001 From: CaptainB Date: Fri, 5 Dec 2025 18:22:22 +0800 Subject: [PATCH] feat: add file upload functionality and enhance data source handling --- .../impl/base_document_extract_node.py | 23 ------- .../tool_lib_node/impl/base_tool_lib_node.py | 69 ++++++++++++++++++- 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/apps/application/flow/step_node/document_extract_node/impl/base_document_extract_node.py b/apps/application/flow/step_node/document_extract_node/impl/base_document_extract_node.py index edada901d..b3a317724 100644 --- a/apps/application/flow/step_node/document_extract_node/impl/base_document_extract_node.py +++ b/apps/application/flow/step_node/document_extract_node/impl/base_document_extract_node.py @@ -57,29 +57,6 @@ class BaseDocumentExtractNode(IDocumentExtractNode): document_list = [] for doc in document: - if 'file_bytes' in doc: - file_bytes = doc['file_bytes'] - # 如果是字符串,转换为字节 - if isinstance(file_bytes, str): - file_bytes = ast.literal_eval(file_bytes) - doc['file_id'] = doc.get('file_id') or uuid.uuid7() - meta = { - 'debug': False if (application_id or knowledge_id) else True, - 'chat_id': chat_id, - 'application_id': str(application_id) if application_id else None, - 'knowledge_id': str(knowledge_id) if knowledge_id else None, - 'file_id': str(doc['file_id']) - } - new_file = File( - id=doc['file_id'], - file_name=doc['name'], - file_size=len(file_bytes), - source_type=FileSourceType.APPLICATION.value if meta[ - 'application_id'] else FileSourceType.KNOWLEDGE.value, - source_id=meta['application_id'] if meta['application_id'] else meta['knowledge_id'], - meta={} - ) - new_file.save(file_bytes) file = QuerySet(File).filter(id=doc['file_id']).first() buffer = io.BytesIO(file.get_bytes()) buffer.name = doc['name'] # this is the important line diff --git a/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py b/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py index 7a0690e93..840eb5614 100644 --- a/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py +++ b/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py @@ -6,10 +6,14 @@ @date:2024/8/8 17:49 @desc: """ +import base64 +import io import json +import mimetypes import time from typing import Dict +from django.core.files.uploadedfile import InMemoryUploadedFile from django.db.models import QuerySet from django.utils.translation import gettext as _ @@ -19,7 +23,8 @@ from common.database_model_manage.database_model_manage import DatabaseModelMana from common.exception.app_exception import AppApiException from common.utils.rsa_util import rsa_long_decrypt from common.utils.tool_code import ToolExecutor -from maxkb.const import CONFIG +from knowledge.models import FileSourceType +from oss.serializers.file import FileSerializer from tools.models import Tool function_executor = ToolExecutor() @@ -126,6 +131,7 @@ def valid_function(tool_lib, workspace_id): if not tool_lib.is_active: raise Exception(_("Tool is not active")) + def _filter_file_bytes(data): """递归过滤掉所有层级的 file_bytes""" if isinstance(data, dict): @@ -136,6 +142,27 @@ def _filter_file_bytes(data): return data +def bytes_to_uploaded_file(file_bytes, file_name="unknown"): + content_type, _ = mimetypes.guess_type(file_name) + if content_type is None: + # 如果未能识别,设置为默认的二进制文件类型 + content_type = "application/octet-stream" + # 创建一个内存中的字节流对象 + file_stream = io.BytesIO(file_bytes) + + # 获取文件大小 + file_size = len(file_bytes) + + uploaded_file = InMemoryUploadedFile( + file=file_stream, + field_name=None, + name=file_name, + content_type=content_type, + size=file_size, + charset=None, + ) + return uploaded_file + class BaseToolLibNodeNode(IToolLibNode): def save_context(self, details, workflow_manage): @@ -168,12 +195,50 @@ class BaseToolLibNodeNode(IToolLibNode): else: all_params = init_params_default_value | params if self.node.properties.get('kind') == 'data-source': - all_params = {**all_params, **self.workflow_params.get('data_source')} + download_file_list = [] + download_list = function_executor.exec_code( + tool_lib.code, + {**all_params, **self.workflow_params.get('data_source')}, + function_name='get_down_file_list' + ) + for item in download_list: + result = function_executor.exec_code( + tool_lib.code, + {**all_params, **self.workflow_params.get('data_source'), + 'download_item': item}, + function_name='download' + ) + file_bytes = result.get('file_bytes', []) + chunks = [] + for chunk in file_bytes: + chunks.append(base64.b64decode(chunk)) + file = bytes_to_uploaded_file(b''.join(chunks), result.get('name')) + file_url = self.upload_knowledge_file(file) + download_file_list.append({'file_id': file_url.split('/')[-1], 'name': result.get('name')}) + all_params = { + **all_params, **self.workflow_params.get('data_source'), + 'download_file_list': download_file_list + } result = function_executor.exec_code(tool_lib.code, all_params) return NodeResult({'result': result}, (self.workflow_manage.params.get('knowledge_base') or {}) if self.node.properties.get( 'kind') == 'data-source' else {}, _write_context=write_context) + def upload_knowledge_file(self, file): + knowledge_id = self.workflow_params.get('knowledge_id') + meta = { + 'debug': False, + 'knowledge_id': knowledge_id, + } + file_url = FileSerializer(data={ + 'file': file, + 'meta': meta, + 'source_id': knowledge_id, + 'source_type': FileSourceType.KNOWLEDGE.value + }).upload() + file.close() + return file_url + def get_details(self, index: int, **kwargs): result = _filter_file_bytes(self.context.get('result'))