FastGPT/projects/app/src/web/common/api/fetch.ts
YeYuheng 501f263e1b
skill agent (#6089)
* cp skill chat

* rebase fdf933d
 and add skill chat

* 1. skill 的 CRUD
2. skill 的信息渲染到前端界面

* solve comment

* remove chatid and chatItemId

* skill match

* perf: skill manage

* fix: ts

---------

Co-authored-by: xxyyh <2289112474@qq>
Co-authored-by: archer <545436317@qq.com>
2025-12-22 16:04:47 +08:00

377 lines
11 KiB
TypeScript

import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { getErrText } from '@fastgpt/global/common/error/utils';
import type { StartChatFnProps } from '@/components/core/chat/ChatContainer/type';
import {
// refer to https://github.com/ChatGPTNextWeb/ChatGPT-Next-Web
EventStreamContentType,
fetchEventSource
} from '@fortaine/fetch-event-source';
import { TeamErrEnum } from '@fastgpt/global/common/error/code/team';
import { useSystemStore } from '../system/useSystemStore';
import { formatTime2YMDHMW } from '@fastgpt/global/common/string/time';
import { getWebReqUrl } from '@fastgpt/web/common/system/utils';
import type { OnOptimizePromptProps } from '@/components/common/PromptEditor/OptimizerPopover';
import type { OnOptimizeCodeProps } from '@/pageComponents/app/detail/WorkflowComponents/Flow/nodes/NodeCode/Copilot';
import type { AIChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import type { TopAgentFormDataType } from '@fastgpt/service/core/chat/HelperBot/dispatch/topAgent/type';
type StreamFetchProps = {
url?: string;
data: Record<string, any>;
onMessage: StartChatFnProps['generatingMessage'];
abortCtrl: AbortController;
};
export type StreamResponseType = {
responseText: string;
};
type CommonResponseType = {
responseValueId?: string;
stepId?: string;
};
type ResponseQueueItemType = CommonResponseType &
(
| {
event: SseResponseEventEnum.fastAnswer | SseResponseEventEnum.answer;
text?: string;
reasoningText?: string;
}
| {
event: SseResponseEventEnum.interactive;
[key: string]: any;
}
| {
event: SseResponseEventEnum.agentPlan;
agentPlan: AIChatItemValueItemType['agentPlan'];
}
| {
event:
| SseResponseEventEnum.toolCall
| SseResponseEventEnum.toolParams
| SseResponseEventEnum.toolResponse;
tools: any;
}
| {
event: SseResponseEventEnum.formData;
data: TopAgentFormDataType;
}
);
class FatalError extends Error {}
export const streamFetch = ({
url = '/api/v2/chat/completions',
data,
onMessage,
abortCtrl
}: StreamFetchProps) =>
new Promise<StreamResponseType>(async (resolve, reject) => {
// First res
const timeoutId = setTimeout(() => {
abortCtrl.abort('Time out');
}, 60000);
// response data
let responseText = '';
let responseQueue: ResponseQueueItemType[] = [];
let errMsg: string | undefined;
let finished = false;
const finish = () => {
if (errMsg !== undefined) {
return failedFinish();
}
return resolve({
responseText
});
};
const failedFinish = (err?: any) => {
finished = true;
reject({
message: getErrText(err, errMsg ?? '响应过程出现异常~'),
responseText
});
};
const isAnswerEvent = (event: SseResponseEventEnum) =>
event === SseResponseEventEnum.answer || event === SseResponseEventEnum.fastAnswer;
// animate response to make it looks smooth
function animateResponseText() {
// abort message
if (abortCtrl.signal.aborted) {
responseQueue.forEach((item) => {
onMessage(item);
if (isAnswerEvent(item.event) && 'text' in item && item.text) {
responseText += item.text;
}
});
return finish();
}
if (responseQueue.length > 0) {
const fetchCount = Math.max(1, Math.round(responseQueue.length / 30));
for (let i = 0; i < fetchCount; i++) {
const item = responseQueue[i];
onMessage(item);
if (isAnswerEvent(item.event) && 'text' in item && item.text) {
responseText += item.text;
}
}
responseQueue = responseQueue.slice(fetchCount);
}
if (finished && responseQueue.length === 0) {
return finish();
}
requestAnimationFrame(animateResponseText);
}
// start animation
animateResponseText();
const pushDataToQueue = (data: ResponseQueueItemType) => {
// If the document is hidden, the data is directly sent to the front end
responseQueue.push(data);
if (document.hidden) {
animateResponseText();
}
};
try {
// auto complete variables
const variables = data?.variables || {};
variables.cTime = formatTime2YMDHMW(new Date());
const requestData = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
signal: abortCtrl.signal,
body: JSON.stringify({
...data,
variables,
detail: true,
stream: true,
retainDatasetCite: data.retainDatasetCite ?? true
})
};
// send request
await fetchEventSource(getWebReqUrl(url), {
...requestData,
async onopen(res) {
clearTimeout(timeoutId);
const contentType = res.headers.get('content-type');
// not stream
if (contentType?.startsWith('text/plain')) {
return failedFinish(await res.clone().text());
}
// failed stream
if (
!res.ok ||
!res.headers.get('content-type')?.startsWith(EventStreamContentType) ||
res.status !== 200
) {
try {
failedFinish(await res.clone().json());
} catch {
const errText = await res.clone().text();
if (!errText.startsWith('event: error')) {
failedFinish();
}
}
}
},
onmessage: ({ event, data }) => {
if (data === '[DONE]') {
return;
}
// parse text to json
const parseJson = (() => {
try {
return JSON.parse(data);
} catch (error) {
return;
}
})();
if (typeof parseJson !== 'object') return;
const { responseValueId, stepId, ...rest } = parseJson;
// console.log(parseJson, event);
if (event === SseResponseEventEnum.answer) {
const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || '';
pushDataToQueue({
responseValueId,
stepId,
event,
reasoningText
});
const text = rest.choices?.[0]?.delta?.content || '';
for (const item of text) {
pushDataToQueue({
responseValueId,
stepId,
event,
text: item
});
}
} else if (event === SseResponseEventEnum.fastAnswer) {
const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || '';
pushDataToQueue({
responseValueId,
stepId,
event,
reasoningText
});
const text = rest.choices?.[0]?.delta?.content || '';
pushDataToQueue({
responseValueId,
stepId,
event,
text
});
} else if (
event === SseResponseEventEnum.toolCall ||
event === SseResponseEventEnum.toolParams ||
event === SseResponseEventEnum.toolResponse
) {
pushDataToQueue({
responseValueId,
stepId,
event,
...rest
});
} else if (event === SseResponseEventEnum.flowNodeResponse) {
onMessage({
event,
nodeResponse: rest
});
} else if (event === SseResponseEventEnum.updateVariables) {
onMessage({
event,
variables: rest
});
} else if (event === SseResponseEventEnum.interactive) {
pushDataToQueue({
responseValueId,
stepId,
event,
...rest
});
} else if (event === SseResponseEventEnum.agentPlan) {
pushDataToQueue({
responseValueId,
stepId,
event,
agentPlan: rest.agentPlan
});
} else if (event === SseResponseEventEnum.formData) {
// Directly call onMessage for formData, no need to queue
onMessage({
event,
formData: rest
});
} else if (event === SseResponseEventEnum.generatedSkill) {
// Directly call onMessage for generatedSkill, no need to queue
onMessage({
event,
generatedSkill: rest
});
} else if (event === SseResponseEventEnum.error) {
if (rest.statusText === TeamErrEnum.aiPointsNotEnough) {
useSystemStore.getState().setNotSufficientModalType(TeamErrEnum.aiPointsNotEnough);
}
errMsg = getErrText(rest, '流响应错误');
} else if (
[SseResponseEventEnum.workflowDuration, SseResponseEventEnum.flowNodeStatus].includes(
event as any
)
) {
onMessage({
event,
...rest
});
}
},
onclose() {
finished = true;
},
onerror(err) {
console.log(err, 'fetch error');
clearTimeout(timeoutId);
failedFinish(getErrText(err));
throw new Error(err);
},
openWhenHidden: true
});
} catch (err: any) {
clearTimeout(timeoutId);
if (abortCtrl.signal.aborted) {
finished = true;
return;
}
console.log(err, 'fetch error');
failedFinish(err);
}
});
export const onOptimizePrompt = async ({
originalPrompt,
model,
input,
onResult,
abortController
}: OnOptimizePromptProps) => {
const controller = abortController || new AbortController();
await streamFetch({
url: '/api/core/ai/optimizePrompt',
data: {
originalPrompt,
optimizerInput: input,
model
},
onMessage: ({ event, text }) => {
if (event === SseResponseEventEnum.answer && text) {
onResult(text);
}
},
abortCtrl: controller
});
};
export const onOptimizeCode = async ({
optimizerInput,
model,
conversationHistory = [],
onResult,
abortController
}: OnOptimizeCodeProps) => {
const controller = abortController || new AbortController();
await streamFetch({
url: '/api/core/workflow/optimizeCode',
data: {
optimizerInput,
model,
conversationHistory
},
onMessage: ({ event, text }) => {
if (event === SseResponseEventEnum.answer && text) {
onResult(text);
}
},
abortCtrl: controller
});
};