FastGPT/packages/service/core/chat/saveChat.ts
2025-12-04 15:37:22 +08:00

682 lines
18 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import type {
AIChatItemType,
ChatHistoryItemResType,
UserChatItemType
} from '@fastgpt/global/core/chat/type';
import { MongoApp } from '../app/schema';
import type { ChatSourceEnum } from '@fastgpt/global/core/chat/constants';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { MongoChatItem } from './chatItemSchema';
import { MongoChat } from './chatSchema';
import { addLog } from '../../common/system/log';
import { mongoSessionRun } from '../../common/mongo/sessionRun';
import { type StoreNodeItemType } from '@fastgpt/global/core/workflow/type/node';
import { getAppChatConfig, getGuideModule } from '@fastgpt/global/core/workflow/utils';
import { type AppChatConfigType, type VariableItemType } from '@fastgpt/global/core/app/type';
import { mergeChatResponseData } from '@fastgpt/global/core/chat/utils';
import { pushChatLog } from './pushChatLog';
import {
FlowNodeTypeEnum,
FlowNodeInputTypeEnum
} from '@fastgpt/global/core/workflow/node/constant';
import { extractDeepestInteractive } from '@fastgpt/global/core/workflow/runtime/utils';
import { MongoAppChatLog } from '../app/logs/chatLogsSchema';
import { writePrimary } from '../../common/mongo/utils';
import { MongoChatItemResponse } from './chatItemResponseSchema';
import { chatValue2RuntimePrompt } from '@fastgpt/global/core/chat/adapt';
import type { ClientSession } from '../../common/mongo';
import { removeS3TTL } from '../../common/s3/utils';
import { VariableInputEnum } from '@fastgpt/global/core/workflow/constants';
import { encryptSecretValue, anyValueDecrypt } from '../../common/secret/utils';
import type { SecretValueType } from '@fastgpt/global/common/secret/type';
import { ConfirmPlanAgentText } from '@fastgpt/global/core/workflow/runtime/constants';
type Props = {
chatId: string;
appId: string;
teamId: string;
tmbId: string;
nodes: StoreNodeItemType[];
appChatConfig?: AppChatConfigType;
variables?: Record<string, any>;
isUpdateUseTime: boolean;
newTitle: string;
source: `${ChatSourceEnum}`;
sourceName?: string;
shareId?: string;
outLinkUid?: string;
userContent: UserChatItemType & { dataId?: string };
aiContent: AIChatItemType & { dataId?: string };
metadata?: Record<string, any>;
durationSeconds: number; //s
errorMsg?: string;
};
const beforProcess = (props: Props) => {
// Remove url
props.userContent.value.forEach((item) => {
if (item.file?.key) {
item.file.url = '';
}
});
};
const afterProcess = async ({
contents,
variables,
variableList,
session
}: {
contents: (UserChatItemType | AIChatItemType)[];
variables?: Record<string, any>;
variableList?: VariableItemType[];
session: ClientSession;
}) => {
const contentFileKeys = contents
.map((item) => {
if (item.value && Array.isArray(item.value)) {
return item.value.flatMap((valueItem) => {
const keys: string[] = [];
// 1. chat file
if ('file' in valueItem && valueItem.file?.key) {
keys.push(valueItem.file.key);
}
// 2. plugin input
if ('text' in valueItem && valueItem.text?.content) {
try {
const parsed = JSON.parse(valueItem.text.content);
// 2.1 plugin input - 数组格式
if (Array.isArray(parsed)) {
parsed.forEach((field) => {
if (field.value && Array.isArray(field.value)) {
field.value.forEach((file: { key: string }) => {
if (file.key && typeof file.key === 'string') {
keys.push(file.key);
}
});
}
});
}
// 2.2 form input - 对象格式 { "字段名": [{ key, url, ... }] }
else if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
Object.values(parsed).forEach((fieldValue) => {
if (Array.isArray(fieldValue)) {
fieldValue.forEach((file: any) => {
if (
file &&
typeof file === 'object' &&
file.key &&
typeof file.key === 'string'
) {
keys.push(file.key);
}
});
}
});
}
} catch (err) {}
}
return keys;
});
}
return [];
})
.flat()
.filter(Boolean) as string[];
const variableFileKeys: string[] = [];
if (variables && variableList) {
variableList.forEach((varItem) => {
if (varItem.type === VariableInputEnum.file) {
const varValue = variables[varItem.key];
if (Array.isArray(varValue)) {
variableFileKeys.push(...varValue.map((item) => item.key));
}
}
});
}
const allFileKeys = [...new Set([...contentFileKeys, ...variableFileKeys])];
if (allFileKeys.length > 0) {
await removeS3TTL({ key: allFileKeys, bucketName: 'private', session });
}
};
const formatAiContent = ({
aiContent,
durationSeconds,
errorMsg
}: {
aiContent: AIChatItemType & { dataId?: string };
durationSeconds: number;
errorMsg?: string;
}) => {
const { responseData, ...aiResponse } = aiContent;
const citeCollectionIds = new Set<string>();
const nodeResponses = responseData?.map((responseItem) => {
if (responseItem.moduleType === FlowNodeTypeEnum.datasetSearchNode && responseItem.quoteList) {
return {
...responseItem,
quoteList: responseItem.quoteList.map((quote) => {
citeCollectionIds.add(quote.collectionId);
return {
id: quote.id,
chunkIndex: quote.chunkIndex,
datasetId: quote.datasetId,
collectionId: quote.collectionId,
sourceId: quote.sourceId,
sourceName: quote.sourceName,
score: quote.score
};
})
};
}
return responseItem;
}) as ChatHistoryItemResType[] | undefined;
return {
aiResponse: {
...aiResponse,
durationSeconds,
errorMsg,
citeCollectionIds: Array.from(citeCollectionIds)
},
nodeResponses,
citeCollectionIds
};
};
const getChatDataLog = async ({
nodeResponses
}: {
nodeResponses: ReturnType<typeof formatAiContent>['nodeResponses'];
}) => {
const now = new Date();
const fifteenMinutesAgo = new Date(now.getTime() - 15 * 60 * 1000);
const errorCount = nodeResponses?.some((item) => item.errorText) ? 1 : 0;
const totalPoints =
nodeResponses?.reduce((sum: number, item: any) => sum + (item.totalPoints || 0), 0) || 0;
return {
fifteenMinutesAgo,
errorCount,
totalPoints,
now
};
};
export const pushChatRecords = async (props: Props) => {
beforProcess(props);
const {
chatId,
appId,
teamId,
tmbId,
nodes,
appChatConfig,
variables,
isUpdateUseTime,
newTitle,
source,
sourceName,
shareId,
outLinkUid,
userContent,
aiContent,
durationSeconds,
errorMsg,
metadata = {}
} = props;
if (!chatId || chatId === 'NO_RECORD_HISTORIES') return;
try {
const chat = await MongoChat.findOne(
{
appId,
chatId
},
'_id metadata'
);
const metadataUpdate = {
...chat?.metadata,
...metadata
};
const { welcomeText, variables: variableList } = getAppChatConfig({
chatConfig: appChatConfig,
systemConfigNode: getGuideModule(nodes),
isPublicFetch: false
});
const pluginInputs = nodes?.find(
(node) => node.flowNodeType === FlowNodeTypeEnum.pluginInput
)?.inputs;
// Format save chat content: Remove quote q/a
const { aiResponse, nodeResponses } = formatAiContent({
aiContent,
durationSeconds,
errorMsg
});
const processedContent = [userContent, aiResponse];
await mongoSessionRun(async (session) => {
const [{ _id: chatItemIdHuman }, { _id: chatItemIdAi, dataId }] = await MongoChatItem.create(
processedContent.map((item) => ({
chatId,
teamId,
tmbId,
appId,
...item
})),
{ session, ordered: true, ...writePrimary }
);
// Create chat item respones
if (nodeResponses) {
await MongoChatItemResponse.create(
nodeResponses.map((item) => ({
teamId,
appId,
chatId,
chatItemDataId: dataId,
data: item
})),
{ session, ordered: true, ...writePrimary }
);
}
await MongoChat.updateOne(
{
appId,
chatId
},
{
$set: {
teamId,
tmbId,
appId,
chatId,
variableList,
welcomeText,
variables: variables || {},
pluginInputs,
title: newTitle,
source,
sourceName,
shareId,
outLinkUid,
metadata: metadataUpdate,
updateTime: new Date()
}
},
{
session,
upsert: true,
...writePrimary
}
);
await afterProcess({
contents: processedContent,
variables,
variableList,
session
});
pushChatLog({
chatId,
chatItemIdHuman: String(chatItemIdHuman),
chatItemIdAi: String(chatItemIdAi),
appId
});
});
// Create chat data log
try {
const { fifteenMinutesAgo, errorCount, totalPoints, now } = await getChatDataLog({
nodeResponses
});
const userId = String(outLinkUid || tmbId);
const hasHistoryChat = await MongoAppChatLog.exists({
teamId,
appId,
userId,
createTime: { $lt: now }
});
await MongoAppChatLog.updateOne(
{
teamId,
appId,
chatId,
updateTime: { $gte: fifteenMinutesAgo }
},
{
$inc: {
chatItemCount: 1,
errorCount,
totalPoints,
totalResponseTime: durationSeconds
},
$set: {
updateTime: now,
sourceName
},
$setOnInsert: {
appId,
teamId,
chatId,
userId,
source,
createTime: now,
goodFeedbackCount: 0,
badFeedbackCount: 0,
isFirstChat: !hasHistoryChat
}
},
{
upsert: true,
...writePrimary
}
);
} catch (error) {
addLog.error('Push chat log error', error);
}
if (isUpdateUseTime) {
await MongoApp.updateOne(
{ _id: appId },
{
updateTime: new Date()
},
{
...writePrimary
}
).catch();
}
} catch (error) {
addLog.error(`Save chat history error`, error);
}
};
/*
更新交互节点,包含两种情况:
1. 更新当前的 items并把 value 追加到当前 items
2. 新增 items, 次数只需要改当前的 items 里的交互节点值即可,其他属性追加在新增的 items 里
*/
export const updateInteractiveChat = async (props: Props) => {
beforProcess(props);
const {
teamId,
chatId,
appId,
nodes,
appChatConfig,
userContent,
aiContent,
variables,
durationSeconds,
errorMsg
} = props;
if (!chatId) return;
const { variables: variableList } = getAppChatConfig({
chatConfig: appChatConfig,
systemConfigNode: getGuideModule(nodes),
isPublicFetch: false
});
const chatItem = await MongoChatItem.findOne({ appId, chatId, obj: ChatRoleEnum.AI }).sort({
_id: -1
});
if (!chatItem || chatItem.obj !== ChatRoleEnum.AI) return;
// Get interactive value
const interactiveValue = chatItem.value[chatItem.value.length - 1];
if (!interactiveValue || !interactiveValue.interactive) {
return;
}
interactiveValue.interactive.params = interactiveValue.interactive.params || {};
// Get interactive response
const { text: userInteractiveVal } = chatValue2RuntimePrompt(userContent.value);
// 拿到的是实参
const finalInteractive = extractDeepestInteractive(interactiveValue.interactive);
/*
需要追加一条 chat_items 记录,而不是修改原来的。
1. Ask query: 用户肯定会输入一条新消息
2. Plan check 非确认模式,用户也是输入一条消息。
*/
const pushNewItems =
finalInteractive.type === 'agentPlanAskQuery' ||
(finalInteractive.type === 'agentPlanCheck' && userInteractiveVal !== ConfirmPlanAgentText);
if (pushNewItems) {
return await pushChatRecords(props);
}
const parsedUserInteractiveVal = (() => {
try {
return JSON.parse(userInteractiveVal);
} catch (err) {
return userInteractiveVal;
}
})();
const { aiResponse, nodeResponses } = formatAiContent({
aiContent,
durationSeconds,
errorMsg
});
/*
在原来 chat_items 上更新。
1. 更新交互响应结果
2. 合并 chat_item 数据
3. 合并 chat_item_response 数据
*/
// Update interactive value
{
if (
finalInteractive.type === 'userSelect' ||
finalInteractive.type === 'agentPlanAskUserSelect'
) {
finalInteractive.params.userSelectedVal = userInteractiveVal;
} else if (
(finalInteractive.type === 'userInput' || finalInteractive.type === 'agentPlanAskUserForm') &&
typeof parsedUserInteractiveVal === 'object'
) {
finalInteractive.params.inputForm = finalInteractive.params.inputForm.map((item) => {
const itemValue = parsedUserInteractiveVal[item.key];
if (itemValue === undefined) return item;
// 如果是密码类型,加密后存储
if (item.type === FlowNodeInputTypeEnum.password) {
const decryptedVal = anyValueDecrypt(itemValue);
if (typeof decryptedVal === 'string') {
return {
...item,
value: encryptSecretValue({
value: decryptedVal,
secret: ''
} as SecretValueType)
};
}
return {
...item,
value: itemValue
};
}
return {
...item,
value: itemValue
};
});
finalInteractive.params.submitted = true;
} else if (finalInteractive.type === 'paymentPause') {
chatItem.value.pop();
} else if (finalInteractive.type === 'agentPlanCheck') {
finalInteractive.params.confirmed = true;
}
}
// Update current items
{
if (aiContent.customFeedbacks) {
chatItem.customFeedbacks = chatItem.customFeedbacks
? [...chatItem.customFeedbacks, ...aiContent.customFeedbacks]
: aiContent.customFeedbacks;
}
if (aiContent.value) {
chatItem.value = chatItem.value ? [...chatItem.value, ...aiContent.value] : aiContent.value;
}
if (aiResponse.citeCollectionIds) {
chatItem.citeCollectionIds = chatItem.citeCollectionIds
? [...chatItem.citeCollectionIds, ...aiResponse.citeCollectionIds]
: aiResponse.citeCollectionIds;
}
if (aiContent.memories) {
chatItem.memories = {
...chatItem.memories,
...aiContent.memories
};
}
chatItem.durationSeconds = chatItem.durationSeconds
? +(chatItem.durationSeconds + durationSeconds).toFixed(2)
: durationSeconds;
}
chatItem.markModified('value');
await mongoSessionRun(async (session) => {
// Merge chat item respones
if (nodeResponses) {
const lastResponse = await MongoChatItemResponse.findOne({
appId,
chatId,
chatItemDataId: chatItem.dataId
})
.sort({
_id: -1
})
.lean()
.session(session);
const newResponses = lastResponse?.data
? mergeChatResponseData([lastResponse?.data, ...nodeResponses])
: nodeResponses;
await MongoChatItemResponse.create(
newResponses.map((item) => ({
teamId,
appId,
chatId,
chatItemDataId: chatItem.dataId,
data: item
})),
{ session, ordered: true, ...writePrimary }
);
}
await chatItem.save({ session });
await MongoChat.updateOne(
{
appId,
chatId
},
{
$set: {
variables,
updateTime: new Date()
}
},
{
session
}
);
// Create chat item respones
if (nodeResponses) {
// Merge
const lastResponse = await MongoChatItemResponse.findOneAndDelete({
appId,
chatId,
chatItemDataId: chatItem.dataId
})
.sort({
_id: -1
})
.lean()
.session(session);
const newResponses = lastResponse?.data
? // @ts-ignore
mergeChatResponseData([lastResponse?.data, ...nodeResponses])
: nodeResponses;
await MongoChatItemResponse.create(
newResponses.map((item) => ({
teamId,
appId,
chatId,
chatItemDataId: chatItem.dataId,
data: item
})),
{ session, ordered: true, ...writePrimary }
);
}
await afterProcess({
contents: [userContent, aiContent],
variables,
variableList,
session
});
});
// Push chat data logs
try {
const { fifteenMinutesAgo, errorCount, totalPoints, now } = await getChatDataLog({
nodeResponses
});
await MongoAppChatLog.updateOne(
{
teamId,
appId,
chatId,
updateTime: { $gte: fifteenMinutesAgo }
},
{
$inc: {
chatItemCount: 1,
errorCount,
totalPoints,
totalResponseTime: durationSeconds
},
$set: {
updateTime: now
}
},
{
...writePrimary
}
);
} catch (error) {
addLog.error('update interactive chat log error', error);
}
};