perf: stream response add id

This commit is contained in:
archer 2025-09-09 14:01:18 +08:00
parent 8e1b835501
commit 87e08d9ed5
No known key found for this signature in database
GPG Key ID: 4446499B846D4A9E
9 changed files with 112 additions and 74 deletions

View File

@ -83,6 +83,7 @@ export type SystemChatItemType = {
};
export type AIChatItemValueItemType = {
id?: string; // Client concat stream response
type:
| ChatItemValueTypeEnum.text
| ChatItemValueTypeEnum.reasoning

View File

@ -30,7 +30,7 @@ export type ResponseEvents = {
onStreaming?: (e: { text: string }) => void;
onReasoning?: (e: { text: string }) => void;
onToolCall?: (e: { call: ChatCompletionMessageToolCall }) => void;
onToolParam?: (e: { tool: ChatCompletionMessageToolCall; params: string }) => void;
onToolParam?: (e: { call: ChatCompletionMessageToolCall; params: string }) => void;
};
export type CreateLLMResponseProps<T extends CompletionsBodyType = CompletionsBodyType> = {
@ -254,7 +254,7 @@ export const createStreamResponse = async ({
if (currentTool && arg) {
currentTool.function.arguments += arg;
onToolParam?.({ tool: currentTool, params: arg });
onToolParam?.({ call: currentTool, params: arg });
}
}
});

View File

@ -79,7 +79,6 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
node: { nodeId, name, isEntry, version, inputs },
lang,
runtimeNodes,
runtimeEdges,
histories,
query,
requestOrigin,
@ -206,7 +205,6 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
userKey: externalProvider.openaiAccount,
isAborted: res ? () => res.closed : undefined,
getToolInfo,
onReasoning({ text }) {
@ -228,6 +226,7 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
onToolCall({ call }) {
const toolNode = getToolInfo(call.function.name);
workflowStreamResponse?.({
id: call.id,
event: SseResponseEventEnum.toolCall,
data: {
tool: {
@ -241,12 +240,13 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
}
});
},
onToolParam({ tool, params }) {
onToolParam({ call, params }) {
workflowStreamResponse?.({
id: call.id,
event: SseResponseEventEnum.toolParams,
data: {
tool: {
id: tool.id,
id: call.id,
toolName: '',
toolAvatar: '',
params,
@ -262,7 +262,8 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
const {
response,
usages = [],
isEnd
isEnd,
streamResponse = true
} = await (async () => {
try {
if (toolId === SubAppIds.stop) {
@ -278,9 +279,11 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
model,
temperature,
top_p: aiChatTopP,
stream,
onStreaming({ text }) {
//TODO: 需要一个新的 plan sse event
workflowStreamResponse?.({
id: call.id,
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
@ -315,7 +318,8 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
return {
response,
usages,
isEnd: false
isEnd: false,
streamResponse: false
};
} else if (toolId === SubAppIds.model) {
const { systemPrompt, task } = parseToolArgs<{
@ -332,6 +336,7 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
task,
onStreaming({ text }) {
workflowStreamResponse?.({
id: call.id,
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
@ -348,7 +353,8 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
return {
response,
usages,
isEnd: false
isEnd: false,
streamResponse: false
};
} else if (toolId === SubAppIds.fileRead) {
const { file_indexes } = parseToolArgs<{
@ -461,18 +467,21 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
})();
// Push stream response
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: call.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(response, 5000, 5000)
if (streamResponse) {
workflowStreamResponse?.({
id: call.id,
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: call.id,
toolName: '',
toolAvatar: '',
params: '',
response
}
}
}
});
});
}
// TODO: 推送账单

View File

@ -41,10 +41,8 @@ export type DispatchFlowResponse = {
durationSeconds: number;
};
export type WorkflowResponseType = ({
event,
data
}: {
export type WorkflowResponseType = (e: {
id?: string;
event: SseResponseEventEnum;
data: Record<string, any>;
}) => void;

View File

@ -40,13 +40,7 @@ export const getWorkflowResponseWrite = ({
id?: string;
showNodeStatus?: boolean;
}) => {
const fn: WorkflowResponseType = ({
event,
data
}: {
event: SseResponseEventEnum;
data: Record<string, any>;
}) => {
const fn: WorkflowResponseType = ({ id, event, data }) => {
if (!res || res.closed || !streamResponse) return;
// Forbid show detail
@ -68,7 +62,10 @@ export const getWorkflowResponseWrite = ({
responseWrite({
res,
event: detail ? event : undefined,
data: JSON.stringify(data)
data: JSON.stringify({
...data,
...(id && detail && { responseValueId: id })
})
});
};
return fn;

View File

@ -99,7 +99,7 @@ const AIContentCard = React.memo(function AIContentCard({
onOpenCiteModal
}: {
dataId: string;
chatValue: ChatItemValueItemType[];
chatValue: AIChatItemValueItemType[];
isLastChild: boolean;
isChatting: boolean;
questionGuides: string[];
@ -108,7 +108,7 @@ const AIContentCard = React.memo(function AIContentCard({
return (
<Flex flexDirection={'column'} gap={2}>
{chatValue.map((value, i) => {
const key = `${dataId}-ai-${i}`;
const key = value.id || `${dataId}-ai-${i}`;
return (
<AIResponseBox
@ -356,7 +356,7 @@ const ChatItem = (props: Props) => {
{type === ChatRoleEnum.AI && (
<>
<AIContentCard
chatValue={value}
chatValue={value as AIChatItemValueItemType[]}
dataId={chat.dataId}
isLastChild={isLastChild && i === splitAiResponseResults.length - 1}
isChatting={isChatting}

View File

@ -66,6 +66,7 @@ import { VariableInputEnum } from '@fastgpt/global/core/workflow/constants';
import { valueTypeFormat } from '@fastgpt/global/core/workflow/runtime/utils';
import { formatTime2YMDHMS } from '@fastgpt/global/common/string/time';
import { TeamErrEnum } from '@fastgpt/global/common/error/code/team';
import { cloneDeep } from 'lodash';
const FeedbackModal = dynamic(() => import('./components/FeedbackModal'));
const ReadFeedbackModal = dynamic(() => import('./components/ReadFeedbackModal'));
@ -231,6 +232,7 @@ const ChatBox = ({
const generatingMessage = useMemoizedFn(
({
responseValueId,
event,
text = '',
reasoningText,
@ -250,9 +252,13 @@ const ChatBox = ({
autoTTSResponse && splitText2Audio(formatChatValue2InputType(item.value).text || '');
const lastValue: AIChatItemValueItemType = JSON.parse(
JSON.stringify(item.value[item.value.length - 1])
);
const updateIndex = (() => {
if (!responseValueId) return item.value.length - 1;
const index = item.value.findIndex((item) => item.id === responseValueId);
if (index !== -1) return index;
return item.value.length - 1;
})();
const updateValue: AIChatItemValueItemType = cloneDeep(item.value[updateIndex]);
if (event === SseResponseEventEnum.flowNodeResponse && nodeResponse) {
return {
@ -268,11 +274,15 @@ const ChatBox = ({
moduleName: name
};
} else if (reasoningText) {
if (lastValue.type === ChatItemValueTypeEnum.reasoning && lastValue.reasoning) {
lastValue.reasoning.content += reasoningText;
if (updateValue.type === ChatItemValueTypeEnum.reasoning && updateValue.reasoning) {
updateValue.reasoning.content += reasoningText;
return {
...item,
value: item.value.slice(0, -1).concat(lastValue)
value: [
...item.value.slice(0, updateIndex),
updateValue,
...item.value.slice(updateIndex + 1)
]
};
} else {
const val: AIChatItemValueItemType = {
@ -283,14 +293,14 @@ const ChatBox = ({
};
return {
...item,
value: item.value.concat(val)
value: [...item.value, val]
};
}
} else if (
(event === SseResponseEventEnum.answer || event === SseResponseEventEnum.fastAnswer) &&
text
) {
if (!lastValue || !lastValue.text) {
if (!updateValue || !updateValue.text) {
const newValue: AIChatItemValueItemType = {
type: ChatItemValueTypeEnum.text,
text: {
@ -302,10 +312,14 @@ const ChatBox = ({
value: item.value.concat(newValue)
};
} else {
lastValue.text.content += text;
updateValue.text.content += text;
return {
...item,
value: item.value.slice(0, -1).concat(lastValue)
value: [
...item.value.slice(0, updateIndex),
updateValue,
...item.value.slice(updateIndex + 1)
]
};
}
} else if (event === SseResponseEventEnum.toolCall && tool) {
@ -320,10 +334,10 @@ const ChatBox = ({
} else if (
event === SseResponseEventEnum.toolParams &&
tool &&
lastValue.type === ChatItemValueTypeEnum.tool &&
lastValue?.tools
updateValue.type === ChatItemValueTypeEnum.tool &&
updateValue?.tools
) {
lastValue.tools = lastValue.tools.map((item) => {
updateValue.tools = updateValue.tools.map((item) => {
if (item.id === tool.id) {
item.params += tool.params;
}
@ -331,24 +345,32 @@ const ChatBox = ({
});
return {
...item,
value: item.value.slice(0, -1).concat(lastValue)
value: [
...item.value.slice(0, updateIndex),
updateValue,
...item.value.slice(updateIndex + 1)
]
};
} else if (event === SseResponseEventEnum.toolResponse && tool) {
} else if (
event === SseResponseEventEnum.toolResponse &&
tool &&
updateValue.type === ChatItemValueTypeEnum.tool &&
updateValue?.tools
) {
// replace tool response
updateValue.tools = updateValue.tools.map((item) => {
if (item.id === tool.id) {
item.response = item.response ? item.response + tool.response : tool.response;
}
return item;
});
return {
...item,
value: item.value.map((val) => {
if (val.type === ChatItemValueTypeEnum.tool && val.tools) {
const tools = val.tools.map((item) =>
item.id === tool.id ? { ...item, response: tool.response } : item
);
return {
...val,
tools
};
}
return val;
})
value: [
...item.value.slice(0, updateIndex),
updateValue,
...item.value.slice(updateIndex + 1)
]
};
} else if (event === SseResponseEventEnum.updateVariables && variables) {
resetVariables({ variables });

View File

@ -5,10 +5,12 @@ import type {
ToolModuleResponseItemType
} from '@fastgpt/global/core/chat/type';
import { ChatSiteItemType } from '@fastgpt/global/core/chat/type';
import type { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import type { WorkflowInteractiveResponseType } from '@fastgpt/global/core/workflow/template/system/interactive/type';
export type generatingMessageProps = {
event: SseResponseEventEnum;
responseValueId?: string;
text?: string;
reasoningText?: string;
name?: string;

View File

@ -24,12 +24,14 @@ export type StreamResponseType = {
};
type ResponseQueueItemType =
| {
responseValueId?: string;
event: SseResponseEventEnum.fastAnswer | SseResponseEventEnum.answer;
text?: string;
reasoningText?: string;
}
| { event: SseResponseEventEnum.interactive; [key: string]: any }
| { responseValueId?: string; event: SseResponseEventEnum.interactive; [key: string]: any }
| {
responseValueId?: string;
event:
| SseResponseEventEnum.toolCall
| SseResponseEventEnum.toolParams
@ -181,31 +183,36 @@ export const streamFetch = ({
})();
if (typeof parseJson !== 'object') return;
const { responseValueId, ...rest } = parseJson;
// console.log(parseJson, event);
if (event === SseResponseEventEnum.answer) {
const reasoningText = parseJson.choices?.[0]?.delta?.reasoning_content || '';
const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || '';
pushDataToQueue({
responseValueId,
event,
reasoningText
});
const text = parseJson.choices?.[0]?.delta?.content || '';
const text = rest.choices?.[0]?.delta?.content || '';
for (const item of text) {
pushDataToQueue({
responseValueId,
event,
text: item
});
}
} else if (event === SseResponseEventEnum.fastAnswer) {
const reasoningText = parseJson.choices?.[0]?.delta?.reasoning_content || '';
const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || '';
pushDataToQueue({
responseValueId,
event,
reasoningText
});
const text = parseJson.choices?.[0]?.delta?.content || '';
const text = rest.choices?.[0]?.delta?.content || '';
pushDataToQueue({
responseValueId,
event,
text
});
@ -215,29 +222,31 @@ export const streamFetch = ({
event === SseResponseEventEnum.toolResponse
) {
pushDataToQueue({
responseValueId,
event,
...parseJson
...rest
});
} else if (event === SseResponseEventEnum.flowNodeResponse) {
onMessage({
event,
nodeResponse: parseJson
nodeResponse: rest
});
} else if (event === SseResponseEventEnum.updateVariables) {
onMessage({
event,
variables: parseJson
variables: rest
});
} else if (event === SseResponseEventEnum.interactive) {
pushDataToQueue({
responseValueId,
event,
...parseJson
...rest
});
} else if (event === SseResponseEventEnum.error) {
if (parseJson.statusText === TeamErrEnum.aiPointsNotEnough) {
if (rest.statusText === TeamErrEnum.aiPointsNotEnough) {
useSystemStore.getState().setNotSufficientModalType(TeamErrEnum.aiPointsNotEnough);
}
errMsg = getErrText(parseJson, '流响应错误');
errMsg = getErrText(rest, '流响应错误');
} else if (
[SseResponseEventEnum.workflowDuration, SseResponseEventEnum.flowNodeStatus].includes(
event as any
@ -245,7 +254,7 @@ export const streamFetch = ({
) {
onMessage({
event,
...parseJson
...rest
});
}
},