fix: add retry mechanism for file uploads and response handling in image.py

This commit is contained in:
wxg0103 2025-10-30 15:34:41 +08:00
parent 775aa9148c
commit ab434b0a9e
3 changed files with 70 additions and 35 deletions

View File

@ -124,20 +124,20 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode):
if self.node.id == val['node_id'] and 'video_list' in val:
if val['dialogue_type'] == 'WORKFLOW':
return chat_record.get_ai_message()
return AIMessage(content=val['answer'])
return AIMessage(content=val['answer'] or '')
return chat_record.get_ai_message()
def generate_history_human_message_for_details(self, chat_record):
for data in chat_record.details.values():
if self.node.id == data['node_id'] and 'video_list' in data:
video_list = data['video_list']
if len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW':
# 增加对 None 和空列表的检查
if not video_list or len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW':
return HumanMessage(content=chat_record.problem_text)
file_id_list = [video.get('file_id') for video in video_list]
return HumanMessage(content=[
{'type': 'text', 'text': data['question']},
*[{'type': 'video_url', 'video_url': {'url': f'./oss/file/{file_id}'}} for file_id in file_id_list]
])
return HumanMessage(content=chat_record.problem_text)
@ -145,7 +145,7 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode):
start_index = len(history_chat_record) - dialogue_number
history_message = reduce(lambda x, y: [*x, *y], [
[self.generate_history_human_message(history_chat_record[index], video_model),
self.generate_history_ai_message(history_chat_record[index]), video_model]
self.generate_history_ai_message(history_chat_record[index])]
for index in
range(start_index if start_index > 0 else 0, len(history_chat_record))], [])
return history_message

View File

@ -1,5 +1,6 @@
# coding=utf-8
import datetime
import time
from typing import Dict, Optional, Any, Iterator
import requests
@ -81,13 +82,24 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI):
return f"oss://{key}"
def upload_file_and_get_url(self, file_stream, file_name):
"""上传文件并获取URL"""
# 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败
policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name)
# 2. 上传文件到OSS
oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name)
max_retries = 3
return oss_url
retry_delay = 1 # 初始重试延迟(秒)
for attempt in range(max_retries):
try:
# 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败
policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name)
# 2. 上传文件到OSS
oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name)
return oss_url
except Exception as e:
if attempt < max_retries - 1:
# 指数退避策略
time.sleep(retry_delay * (2 ** attempt))
continue
else:
raise Exception(f"文件上传失败,已重试{max_retries}次: {str(e)}")
def stream(
self,
@ -129,32 +141,54 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI):
**self.extra_body,
"stream": True,
}
response = requests.post(url, headers=headers, json=data, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to get response: {response.text}")
for line in response.iter_lines():
if line:
try:
decoded_line = line.decode('utf-8')
# 检查是否是有效的SSE数据行
if decoded_line.startswith('data: '):
# 提取JSON部分
json_str = decoded_line[6:] # 移除 'data: ' 前缀
# 检查是否是结束标记
if json_str.strip() == '[DONE]':
# 增加重试机制
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=data, stream=True, timeout=30)
if response.status_code != 200:
raise Exception(f"Failed to get response: {response.text}")
for line in response.iter_lines():
if line:
try:
decoded_line = line.decode('utf-8')
# 检查是否是有效的SSE数据行
if decoded_line.startswith('data: '):
# 提取JSON部分
json_str = decoded_line[6:] # 移除 'data: ' 前缀
# 检查是否是结束标记
if json_str.strip() == '[DONE]':
continue
# 尝试解析JSON
chunk_data = json.loads(json_str)
if 'choices' in chunk_data and chunk_data['choices']:
delta = chunk_data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
yield AIMessage(content=content)
except json.JSONDecodeError:
# 忽略无法解析的行
continue
except Exception as e:
# 处理其他可能的异常
continue
break # 成功执行则退出重试循环
# 尝试解析JSON
chunk_data = json.loads(json_str)
if 'choices' in chunk_data and chunk_data['choices']:
delta = chunk_data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
yield AIMessage(content=content)
except json.JSONDecodeError:
# 忽略无法解析的行
except (requests.exceptions.ProxyError, requests.exceptions.ConnectionError) as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt)) # 指数退避
continue
except Exception as e:
# 处理其他可能的异常
else:
raise Exception(f"网络连接失败,已重试{max_retries}次: {str(e)}")
except Exception as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
continue
else:
raise Exception(f"请求失败,已重试{max_retries}次: {str(e)}")

View File

@ -32,6 +32,7 @@ class VolcanicEngineImage(MaxKBBaseModel, BaseChatOpenAI):
return f'data:{video_format};base64,{base64_video}'
def get_video_format(file_name):
extension = file_name.split('.')[-1].lower()
format_map = {