From ab434b0a9ecb59acf2f689997a5215c83003ec58 Mon Sep 17 00:00:00 2001 From: wxg0103 <727495428@qq.com> Date: Thu, 30 Oct 2025 15:34:41 +0800 Subject: [PATCH] fix: add retry mechanism for file uploads and response handling in image.py --- .../impl/base_video_understand_node.py | 8 +- .../model/image.py | 96 +++++++++++++------ .../model/image.py | 1 + 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/apps/application/flow/step_node/video_understand_step_node/impl/base_video_understand_node.py b/apps/application/flow/step_node/video_understand_step_node/impl/base_video_understand_node.py index a2916cbb1..d61104b82 100644 --- a/apps/application/flow/step_node/video_understand_step_node/impl/base_video_understand_node.py +++ b/apps/application/flow/step_node/video_understand_step_node/impl/base_video_understand_node.py @@ -124,20 +124,20 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode): if self.node.id == val['node_id'] and 'video_list' in val: if val['dialogue_type'] == 'WORKFLOW': return chat_record.get_ai_message() - return AIMessage(content=val['answer']) + return AIMessage(content=val['answer'] or '') return chat_record.get_ai_message() def generate_history_human_message_for_details(self, chat_record): for data in chat_record.details.values(): if self.node.id == data['node_id'] and 'video_list' in data: video_list = data['video_list'] - if len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW': + # 增加对 None 和空列表的检查 + if not video_list or len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW': return HumanMessage(content=chat_record.problem_text) file_id_list = [video.get('file_id') for video in video_list] return HumanMessage(content=[ {'type': 'text', 'text': data['question']}, *[{'type': 'video_url', 'video_url': {'url': f'./oss/file/{file_id}'}} for file_id in file_id_list] - ]) return HumanMessage(content=chat_record.problem_text) @@ -145,7 +145,7 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode): start_index = len(history_chat_record) - dialogue_number history_message = reduce(lambda x, y: [*x, *y], [ [self.generate_history_human_message(history_chat_record[index], video_model), - self.generate_history_ai_message(history_chat_record[index]), video_model] + self.generate_history_ai_message(history_chat_record[index])] for index in range(start_index if start_index > 0 else 0, len(history_chat_record))], []) return history_message diff --git a/apps/models_provider/impl/aliyun_bai_lian_model_provider/model/image.py b/apps/models_provider/impl/aliyun_bai_lian_model_provider/model/image.py index f514a5c06..ac4f20855 100644 --- a/apps/models_provider/impl/aliyun_bai_lian_model_provider/model/image.py +++ b/apps/models_provider/impl/aliyun_bai_lian_model_provider/model/image.py @@ -1,5 +1,6 @@ # coding=utf-8 import datetime +import time from typing import Dict, Optional, Any, Iterator import requests @@ -81,13 +82,24 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI): return f"oss://{key}" def upload_file_and_get_url(self, file_stream, file_name): - """上传文件并获取URL""" - # 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败 - policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name) - # 2. 上传文件到OSS - oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name) + max_retries = 3 - return oss_url + retry_delay = 1 # 初始重试延迟(秒) + + for attempt in range(max_retries): + try: + # 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败 + policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name) + # 2. 上传文件到OSS + oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name) + return oss_url + except Exception as e: + if attempt < max_retries - 1: + # 指数退避策略 + time.sleep(retry_delay * (2 ** attempt)) + continue + else: + raise Exception(f"文件上传失败,已重试{max_retries}次: {str(e)}") def stream( self, @@ -129,32 +141,54 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI): **self.extra_body, "stream": True, } - response = requests.post(url, headers=headers, json=data, stream=True) - if response.status_code != 200: - raise Exception(f"Failed to get response: {response.text}") - for line in response.iter_lines(): - if line: - try: - decoded_line = line.decode('utf-8') - # 检查是否是有效的SSE数据行 - if decoded_line.startswith('data: '): - # 提取JSON部分 - json_str = decoded_line[6:] # 移除 'data: ' 前缀 - # 检查是否是结束标记 - if json_str.strip() == '[DONE]': + + # 增加重试机制 + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + response = requests.post(url, headers=headers, json=data, stream=True, timeout=30) + if response.status_code != 200: + raise Exception(f"Failed to get response: {response.text}") + + for line in response.iter_lines(): + if line: + try: + decoded_line = line.decode('utf-8') + # 检查是否是有效的SSE数据行 + if decoded_line.startswith('data: '): + # 提取JSON部分 + json_str = decoded_line[6:] # 移除 'data: ' 前缀 + # 检查是否是结束标记 + if json_str.strip() == '[DONE]': + continue + + # 尝试解析JSON + chunk_data = json.loads(json_str) + + if 'choices' in chunk_data and chunk_data['choices']: + delta = chunk_data['choices'][0].get('delta', {}) + content = delta.get('content', '') + if content: + yield AIMessage(content=content) + except json.JSONDecodeError: + # 忽略无法解析的行 continue + except Exception as e: + # 处理其他可能的异常 + continue + break # 成功执行则退出重试循环 - # 尝试解析JSON - chunk_data = json.loads(json_str) - - if 'choices' in chunk_data and chunk_data['choices']: - delta = chunk_data['choices'][0].get('delta', {}) - content = delta.get('content', '') - if content: - yield AIMessage(content=content) - except json.JSONDecodeError: - # 忽略无法解析的行 + except (requests.exceptions.ProxyError, requests.exceptions.ConnectionError) as e: + if attempt < max_retries - 1: + time.sleep(retry_delay * (2 ** attempt)) # 指数退避 continue - except Exception as e: - # 处理其他可能的异常 + else: + raise Exception(f"网络连接失败,已重试{max_retries}次: {str(e)}") + except Exception as e: + if attempt < max_retries - 1: + time.sleep(retry_delay * (2 ** attempt)) continue + else: + raise Exception(f"请求失败,已重试{max_retries}次: {str(e)}") diff --git a/apps/models_provider/impl/volcanic_engine_model_provider/model/image.py b/apps/models_provider/impl/volcanic_engine_model_provider/model/image.py index 810a0b8ea..05ce9d933 100644 --- a/apps/models_provider/impl/volcanic_engine_model_provider/model/image.py +++ b/apps/models_provider/impl/volcanic_engine_model_provider/model/image.py @@ -32,6 +32,7 @@ class VolcanicEngineImage(MaxKBBaseModel, BaseChatOpenAI): return f'data:{video_format};base64,{base64_video}' + def get_video_format(file_name): extension = file_name.split('.')[-1].lower() format_map = {