perf: index (#6131)

* perf: index

* stop design doc

* perf: stop workflow;perf: mongo connection

* fix: ts

* mq export
This commit is contained in:
Archer 2025-12-21 19:15:10 +08:00 committed by GitHub
parent 4f95f6867e
commit 2fea73bb68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1419 additions and 238 deletions

View File

@ -0,0 +1,672 @@
---
name: workflow-stop-design
description: 工作流暂停逻辑设计方案
---
## 1. Redis 状态管理方案
### 1.1 状态键设计
**Redis Key 结构:**
```typescript
// Key 格式: agent_runtime_stopping:{appId}:{chatId}
const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping';
type WorkflowStatusKey = `${typeof WORKFLOW_STATUS_PREFIX}:${string}:${string}`;
// 示例: agent_runtime_stopping:app_123456:chat_789012
```
**状态值设计:**
- **存在键 (值为 1)**: 工作流应该停止
- **不存在键**: 工作流正常运行
- **设计简化**: 不使用状态枚举,仅通过键的存在与否判断
**参数类型定义:**
```typescript
type WorkflowStatusParams = {
appId: string;
chatId: string;
};
```
### 1.2 状态生命周期管理
**状态转换流程:**
```
正常运行(无键) → 停止中(键存在) → 完成(删除键)
```
**TTL 设置:**
- **停止标志 TTL**: 60 秒
- 原因: 避免因意外情况导致的键泄漏
- 正常情况下会在工作流完成时主动删除
- **工作流完成后**: 直接删除 Redis 键
- 原因: 不需要保留终态,减少 Redis 内存占用
### 1.3 核心函数说明
**1. setAgentRuntimeStop**
- **功能**: 设置停止标志
- **参数**: `{ appId, chatId }`
- **实现**: 使用 `SETEX` 命令,设置键值为 1,TTL 60 秒
**2. shouldWorkflowStop**
- **功能**: 检查工作流是否应该停止
- **参数**: `{ appId, chatId }`
- **返回**: `Promise<boolean>` - true=应该停止, false=继续运行
- **实现**: GET 命令获取键值,存在则返回 true
**3. delAgentRuntimeStopSign**
- **功能**: 删除停止标志
- **参数**: `{ appId, chatId }`
- **实现**: DEL 命令删除键
**4. waitForWorkflowComplete**
- **功能**: 等待工作流完成(停止标志被删除)
- **参数**: `{ appId, chatId, timeout?, pollInterval? }`
- **实现**: 轮询检查停止标志是否被删除,超时返回
### 1.4 边界情况处理
**1. Redis 操作失败**
- **错误处理**: 所有 Redis 操作都包含 `.catch()` 错误处理
- **降级策略**:
- `shouldWorkflowStop`: 出错时返回 `false` (认为不需要停止,继续运行)
- `delAgentRuntimeStopSign`: 出错时记录错误日志,但不影响主流程
- **设计原因**: Redis 异常不应阻塞工作流运行,降级到继续执行策略
**2. TTL 自动清理**
- **TTL 设置**: 60 秒
- **清理时机**: Redis 自动清理过期键
- **设计原因**:
- 避免因异常情况导致的 Redis 键泄漏
- 自动清理减少手动维护成本
- 60 秒足够大多数工作流完成停止操作
**3. stop 接口等待超时**
- **超时时间**: 5 秒
- **超时策略**: `waitForWorkflowComplete` 在 5 秒内轮询检查停止标志是否被删除
- **超时处理**: 5 秒后直接返回,不影响工作流继续执行
- **设计原因**:
- 避免前端长时间等待
- 5 秒足够大多数节点完成当前操作
- 用户体验优先,超时后前端可选择重试或放弃
**4. 并发停止请求**
- **处理方式**: 多次调用 `setAgentRuntimeStop` 是安全的,Redis SETEX 是幂等操作
- **设计原因**: 避免用户多次点击停止按钮导致的问题
---
## 2. Redis 工具函数实现
**位置**: `packages/service/core/workflow/dispatch/workflowStatus.ts`
```typescript
import { addLog } from '../../../common/system/log';
import { getGlobalRedisConnection } from '../../../common/redis/index';
import { delay } from '@fastgpt/global/common/system/utils';
const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping';
const TTL = 60; // 60秒
export const StopStatus = 'STOPPING';
export type WorkflowStatusParams = {
appId: string;
chatId: string;
};
// 获取工作流状态键
export const getRuntimeStatusKey = (params: WorkflowStatusParams): string => {
return `${WORKFLOW_STATUS_PREFIX}:${params.appId}:${params.chatId}`;
};
// 设置停止标志
export const setAgentRuntimeStop = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.setex(key, TTL, 1);
};
// 删除停止标志
export const delAgentRuntimeStopSign = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.del(key).catch((err) => {
addLog.error(`[Agent Runtime Stop] Delete stop sign error`, err);
});
};
// 检查工作流是否应该停止
export const shouldWorkflowStop = (params: WorkflowStatusParams): Promise<boolean> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
return redis
.get(key)
.then((res) => !!res)
.catch(() => false);
};
/**
* 等待工作流完成(停止标志被删除)
* @param params 工作流参数
* @param timeout 超时时间(毫秒),默认5秒
* @param pollInterval 轮询间隔(毫秒),默认50毫秒
*/
export const waitForWorkflowComplete = async ({
appId,
chatId,
timeout = 5000,
pollInterval = 50
}: {
appId: string;
chatId: string;
timeout?: number;
pollInterval?: number;
}) => {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const sign = await shouldWorkflowStop({ appId, chatId });
// 如果停止标志已被删除,说明工作流已完成
if (!sign) {
return;
}
// 等待下一次轮询
await delay(pollInterval);
}
// 超时后直接返回
return;
};
```
**测试用例位置**: `test/cases/service/core/app/workflow/workflowStatus.test.ts`
```typescript
import { describe, test, expect, beforeEach } from 'vitest';
import {
setAgentRuntimeStop,
delAgentRuntimeStopSign,
shouldWorkflowStop,
waitForWorkflowComplete
} from '@fastgpt/service/core/workflow/dispatch/workflowStatus';
describe('Workflow Status Redis Functions', () => {
const testAppId = 'test_app_123';
const testChatId = 'test_chat_456';
beforeEach(async () => {
// 清理测试数据
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
});
test('should set stopping sign', async () => {
await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId });
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
test('should return false for non-existent status', async () => {
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should return false after deleting stop sign', async () => {
await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId });
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should wait for workflow completion', async () => {
// 设置初始停止标志
await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId });
// 模拟异步完成(删除停止标志)
setTimeout(async () => {
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
}, 500);
// 等待完成
await waitForWorkflowComplete({
appId: testAppId,
chatId: testChatId,
timeout: 2000
});
// 验证停止标志已被删除
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should timeout when waiting too long', async () => {
await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId });
// 等待超时(不删除标志)
await waitForWorkflowComplete({
appId: testAppId,
chatId: testChatId,
timeout: 100
});
// 验证停止标志仍然存在
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
test('should handle concurrent stop sign operations', async () => {
// 并发设置停止标志
await Promise.all([
setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }),
setAgentRuntimeStop({ appId: testAppId, chatId: testChatId })
]);
// 停止标志应该存在
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
});
```
## 3. 工作流停止检测机制改造
### 3.1 修改位置
**文件**: `packages/service/core/workflow/dispatch/index.ts`
### 3.2 工作流启动时的停止检测机制
**改造点 1: 停止检测逻辑 (行 196-216)**
使用内存变量 + 定时轮询 Redis 的方式:
```typescript
import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus';
// 初始化停止检测
let stopping = false;
const checkIsStopping = (): boolean => {
if (apiVersion === 'v2') {
return stopping;
}
if (apiVersion === 'v1') {
if (!res) return false;
return res.closed || !!res.errored;
}
return false;
};
// v2 版本: 启动定时器定期检查 Redis
const checkStoppingTimer =
apiVersion === 'v2'
? setInterval(async () => {
stopping = await shouldWorkflowStop({
appId: runningAppInfo.id,
chatId
});
}, 100)
: undefined;
```
**设计要点**:
- v2 版本使用内存变量 `stopping` + 100ms 定时器轮询 Redis
- v1 版本仍使用原有的 `res.closed/res.errored` 检测
- 轮询频率 100ms,平衡性能和响应速度
**改造点 2: 工作流完成后清理 (行 232-249)**
```typescript
return runWorkflow({
...data,
checkIsStopping, // 传递检测函数
query,
histories,
// ... 其他参数
}).finally(async () => {
// 清理定时器
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
if (checkStoppingTimer) {
clearInterval(checkStoppingTimer);
}
// Close mcpClient connections
Object.values(mcpClientMemory).forEach((client) => {
client.closeConnection();
});
// 工作流完成后删除 Redis 记录
await delAgentRuntimeStopSign({
appId: runningAppInfo.id,
chatId
});
});
```
### 3.3 节点执行前的停止检测
**位置**: `packages/service/core/workflow/dispatch/index.ts:861-868`
`checkNodeCanRun` 方法中,每个节点执行前检查:
```typescript
private async checkNodeCanRun(
node: RuntimeNodeItemType,
skippedNodeIdList = new Set<string>()
) {
// ... 其他检查逻辑 ...
// Check queue status
if (data.maxRunTimes <= 0) {
addLog.error('Max run times is 0', {
appId: data.runningAppInfo.id
});
return;
}
// 停止检测
if (checkIsStopping()) {
addLog.warn('Workflow stopped', {
appId: data.runningAppInfo.id,
nodeId: node.nodeId,
nodeName: node.name
});
return;
}
// ... 执行节点逻辑 ...
}
```
**说明**:
- 直接调用 `checkIsStopping()` 同步方法
- 内部会检查内存变量 `stopping`
- 定时器每 100ms 更新一次该变量
- 检测到停止时记录日志并直接返回,不执行节点
## 4. v2/chat/stop 接口设计
### 4.1 接口规范
**接口路径**: `/api/v2/chat/stop`
**Schema 位置**: `packages/global/openapi/core/chat/api.ts`
**接口文档位置**: `packages/global/openapi/core/chat/index.ts`
**请求方法**: POST
**请求参数**:
```typescript
// packages/global/openapi/core/chat/api.ts
export const StopV2ChatSchema = z
.object({
appId: ObjectIdSchema.describe('应用ID'),
chatId: z.string().min(1).describe('对话ID'),
outLinkAuthData: OutLinkChatAuthSchema.optional().describe('外链鉴权数据')
});
export type StopV2ChatParams = z.infer<typeof StopV2ChatSchema>;
```
**响应格式**:
```typescript
export const StopV2ChatResponseSchema = z
.object({
success: z.boolean().describe('是否成功停止')
});
export type StopV2ChatResponse = z.infer<typeof StopV2ChatResponseSchema>;
```
### 4.2 接口实现
**文件位置**: `projects/app/src/pages/api/v2/chat/stop.ts`
```typescript
import type { NextApiRequest, NextApiResponse } from 'next';
import { NextAPI } from '@/service/middleware/entry';
import { authChatCrud } from '@/service/support/permission/auth/chat';
import {
setAgentRuntimeStop,
waitForWorkflowComplete
} from '@fastgpt/service/core/workflow/dispatch/workflowStatus';
import { StopV2ChatSchema, type StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api';
async function handler(req: NextApiRequest, res: NextApiResponse): Promise<StopV2ChatResponse> {
const { appId, chatId, outLinkAuthData } = StopV2ChatSchema.parse(req.body);
// 鉴权 (复用聊天 CRUD 鉴权)
await authChatCrud({
req,
authToken: true,
authApiKey: true,
appId,
chatId,
...outLinkAuthData
});
// 设置停止标志
await setAgentRuntimeStop({
appId,
chatId
});
// 等待工作流完成 (最多等待 5 秒)
await waitForWorkflowComplete({ appId, chatId, timeout: 5000 });
return {
success: true
};
}
export default NextAPI(handler);
```
**接口文档** (`packages/global/openapi/core/chat/index.ts`):
```typescript
export const ChatPath: OpenAPIPath = {
// ... 其他路径
'/v2/chat/stop': {
post: {
summary: '停止 Agent 运行',
description: `优雅停止正在运行的 Agent, 会尝试等待当前节点结束后返回,最长 5s超过 5s 仍未结束,则会返回成功。
LLM 节点,流输出时会同时被终止,但 HTTP 请求节点这种可能长时间运行的,不会被终止。`,
tags: [TagsMap.chatPage],
requestBody: {
content: {
'application/json': {
schema: StopV2ChatSchema
}
}
},
responses: {
200: {
description: '成功停止工作流',
content: {
'application/json': {
schema: StopV2ChatResponseSchema
}
}
}
}
}
}
};
```
**说明**:
- 接口使用 `authChatCrud` 进行鉴权,支持 Token 和 API Key
- 支持分享链接和团队空间的鉴权数据
- 设置停止标志后等待最多 5 秒
- 无论是否超时,都返回 `success: true`
## 5. 前端改造
由于当前代码已经能够正常工作,且 v2 版本的后端已经实现了基于 Redis 的停止机制,前端可以保持现有的简单实现:
**保持现有实现的原因**:
1. 后端已经通过定时器轮询 Redis 实现了停止检测
2. 前端调用 `abort()` 后,后端会在下个检测周期(100ms内)发现停止标志
3. 简化前端逻辑,避免增加复杂性
4. 用户体验上,立即中断连接响应更快
**可选的增强方案**:
如果需要在前端显示更详细的停止状态,可以添加 API 客户端函数:
**文件位置**: `projects/app/src/web/core/chat/api.ts`
```typescript
import { POST } from '@/web/common/api/request';
import type { StopV2ChatParams, StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api';
/**
* 停止 v2 版本工作流运行
*/
export const stopV2Chat = (data: StopV2ChatParams) =>
POST<StopV2ChatResponse>('/api/v2/chat/stop', data);
```
**增强的 abortRequest 函数**:
```typescript
/* Abort chat completions, questionGuide */
const abortRequest = useMemoizedFn(async (reason: string = 'stop') => {
// 先调用 abort 中断连接
chatController.current?.abort(new Error(reason));
questionGuideController.current?.abort(new Error(reason));
pluginController.current?.abort(new Error(reason));
// v2 版本: 可选地通知后端优雅停止
if (chatBoxData?.app?.version === 'v2' && appId && chatId) {
try {
await stopV2Chat({
appId,
chatId,
outLinkAuthData
});
} catch (error) {
// 静默失败,不影响用户体验
console.warn('Failed to notify backend to stop workflow', error);
}
}
});
```
**建议**:
- **推荐**: 保持当前简单实现,后端已经足够健壮
- **可选**: 如果需要更精确的停止状态追踪,可以实现上述增强方案
## 6. 完整调用流程
### 6.1 正常停止流程
```
用户点击停止按钮
前端: abortRequest()
前端: chatController.abort() [立即中断 HTTP 连接]
[可选] 前端: POST /api/v2/chat/stop
后端: setAgentRuntimeStop(appId, chatId) [设置停止标志]
后端: 定时器检测到 Redis 停止标志,更新内存变量 stopping = true
后端: 下个节点执行前 checkIsStopping() 返回 true
后端: 停止处理新节点,记录日志
后端: 工作流 finally 块删除 Redis 停止标志
[可选] 后端: waitForWorkflowComplete() 检测到停止标志被删除
[可选] 前端: 显示停止成功提示
```
### 6.2 超时流程
```
[可选] 前端: POST /api/v2/chat/stop
后端: setAgentRuntimeStop(appId, chatId)
后端: waitForWorkflowComplete(timeout=5s)
后端: 5秒后停止标志仍存在
后端: 返回成功响应 (不区分超时)
[可选] 前端: 显示成功提示
后端: 工作流继续运行,最终完成后删除停止标志
```
### 6.3 工作流自然完成流程
```
工作流运行中
所有节点执行完成
dispatchWorkFlow.finally()
删除 Redis 停止标志
清理定时器
60秒 TTL 确保即使删除失败也会自动清理
```
### 6.4 时序说明
**关键时间点**:
- **100ms**: 后端定时器检查 Redis 停止标志的频率
- **5s**: stop 接口等待工作流完成的超时时间
- **60s**: Redis 键的 TTL,自动清理时间
**响应时间**:
- 用户点击停止 → HTTP 连接中断: **立即** (前端 abort)
- 停止标志写入 Redis: **< 50ms** (Redis SETEX 操作)
- 后端检测到停止: **< 100ms** (定时器轮询周期)
- 当前节点停止执行: **取决于节点类型**
- LLM 流式输出: **立即**中断流
- HTTP 请求节点: **等待请求完成**
- 其他节点: **等待当前操作完成**
## 7. 测试策略
### 7.1 单元测试
**Redis 工具函数测试**:
- `setAgentRuntimeStop` / `shouldWorkflowStop` 基本功能
- `delAgentRuntimeStopSign` 删除功能
- `waitForWorkflowComplete` 等待机制和超时
- 并发操作安全性
**文件位置**: `test/cases/service/core/app/workflow/workflowStatus.test.ts`
**测试用例**:
```typescript
describe('Workflow Status Redis Functions', () => {
test('should set stopping sign')
test('should return false for non-existent status')
test('should detect stopping status')
test('should return false after deleting stop sign')
test('should wait for workflow completion')
test('should timeout when waiting too long')
test('should delete workflow stop sign')
test('should handle concurrent stop sign operations')
});
```

View File

@ -11,7 +11,7 @@ description: 'FastGPT V4.14.5 更新说明'
## ⚙️ 优化
1. 优化获取 redis 所有 key 的逻辑,避免大量获取时导致阻塞。
2. Redis 和 MQ 的重连逻辑优化。
2. MongoDB, Redis 和 MQ 的重连逻辑优化。
## 🐛 修复

View File

@ -120,7 +120,7 @@
"document/content/docs/upgrading/4-14/4142.mdx": "2025-11-18T19:27:14+08:00",
"document/content/docs/upgrading/4-14/4143.mdx": "2025-11-26T20:52:05+08:00",
"document/content/docs/upgrading/4-14/4144.mdx": "2025-12-16T14:56:04+08:00",
"document/content/docs/upgrading/4-14/4145.mdx": "2025-12-19T00:08:30+08:00",
"document/content/docs/upgrading/4-14/4145.mdx": "2025-12-20T13:11:02+08:00",
"document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00",
"document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00",
"document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00",

View File

@ -40,6 +40,7 @@ export type ExternalProviderType = {
/* workflow props */
export type ChatDispatchProps = {
res?: NextApiResponse;
checkIsStopping: () => boolean;
lang?: localeType;
requestOrigin?: string;
mode: 'test' | 'chat' | 'debug';
@ -63,7 +64,7 @@ export type ChatDispatchProps = {
};
uid: string; // Who run this workflow
chatId?: string;
chatId: string;
responseChatItemId?: string;
histories: ChatItemType[];
variables: Record<string, any>; // global variable
@ -76,7 +77,7 @@ export type ChatDispatchProps = {
maxRunTimes: number;
isToolCall?: boolean;
workflowStreamResponse?: WorkflowResponseType;
version?: 'v1' | 'v2';
apiVersion?: 'v1' | 'v2';
workflowDispatchDeep: number;

View File

@ -1,8 +1,38 @@
import type { OutLinkChatAuthType } from '../../../support/permission/chat/type';
import { OutLinkChatAuthSchema } from '../../../support/permission/chat/type';
import { ObjectIdSchema } from '../../../common/type/mongo';
import z from 'zod';
/* ============ v2/chat/stop ============ */
export const StopV2ChatSchema = z
.object({
appId: ObjectIdSchema.describe('应用ID'),
chatId: z.string().min(1).describe('对话ID'),
outLinkAuthData: OutLinkChatAuthSchema.optional().describe('外链鉴权数据')
})
.meta({
example: {
appId: '1234567890',
chatId: '1234567890',
outLinkAuthData: {
shareId: '1234567890',
outLinkUid: '1234567890'
}
}
});
export type StopV2ChatParams = z.infer<typeof StopV2ChatSchema>;
export const StopV2ChatResponseSchema = z
.object({
success: z.boolean().describe('是否成功停止')
})
.meta({
example: {
success: true
}
});
export type StopV2ChatResponse = z.infer<typeof StopV2ChatResponseSchema>;
/* ============ chat file ============ */
export const PresignChatFileGetUrlSchema = z
.object({
key: z.string().min(1).describe('文件key'),

View File

@ -5,7 +5,12 @@ import { ChatFeedbackPath } from './feedback/index';
import { ChatHistoryPath } from './history/index';
import { z } from 'zod';
import { CreatePostPresignedUrlResultSchema } from '../../../../service/common/s3/type';
import { PresignChatFileGetUrlSchema, PresignChatFilePostUrlSchema } from './api';
import {
PresignChatFileGetUrlSchema,
PresignChatFilePostUrlSchema,
StopV2ChatSchema,
StopV2ChatResponseSchema
} from './api';
import { TagsMap } from '../../tag';
export const ChatPath: OpenAPIPath = {
@ -14,6 +19,31 @@ export const ChatPath: OpenAPIPath = {
...ChatFeedbackPath,
...ChatHistoryPath,
'/v2/chat/stop': {
post: {
summary: '停止 Agent 运行',
description: `优雅停止正在运行的 Agent, 会尝试等待当前节点结束后返回,最长 5s超过 5s 仍未结束,则会返回成功。
LLM HTTP `,
tags: [TagsMap.chatPage],
requestBody: {
content: {
'application/json': {
schema: StopV2ChatSchema
}
}
},
responses: {
200: {
description: '成功停止工作流',
content: {
'application/json': {
schema: StopV2ChatResponseSchema
}
}
}
}
}
},
'/core/chat/presignChatFilePostUrl': {
post: {
summary: '获取文件上传 URL',

View File

@ -60,7 +60,7 @@ export function getQueue<DataType, ReturnType = void>(
// default error handler, to avoid unhandled exceptions
newQueue.on('error', (error) => {
addLog.error(`MQ Queue [${name}]: ${error.message}`, error);
addLog.error(`MQ Queue] error`, error);
});
queues.set(name, newQueue);
return newQueue;
@ -76,44 +76,59 @@ export function getWorker<DataType, ReturnType = void>(
return worker as Worker<DataType, ReturnType>;
}
const newWorker = new Worker<DataType, ReturnType>(name.toString(), processor, {
connection: newWorkerRedisConnection(),
...defaultWorkerOpts,
// BullMQ Worker important settings
lockDuration: 600000, // 10 minutes for large file operations
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 3, // Move job to failed after 1 stall (default behavior)
...opts
});
// default error handler, to avoid unhandled exceptions
newWorker.on('error', async (error) => {
addLog.error(`MQ Worker error`, {
message: error.message,
data: { name }
const createWorker = () => {
const newWorker = new Worker<DataType, ReturnType>(name.toString(), processor, {
connection: newWorkerRedisConnection(),
...defaultWorkerOpts,
// BullMQ Worker important settings
lockDuration: 600000, // 10 minutes for large file operations
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 3, // Move job to failed after 1 stall (default behavior)
...opts
});
await newWorker.close();
});
// Critical: Worker has been closed - remove from pool
newWorker.on('closed', async () => {
addLog.error(`MQ Worker [${name}] closed unexpectedly`, {
data: {
name,
message: 'Worker will need to be manually restarted'
// Worker is ready to process jobs (fired on initial connection and after reconnection)
newWorker.on('ready', () => {
addLog.info(`[MQ Worker] ready`, { name });
});
// default error handler, to avoid unhandled exceptions
newWorker.on('error', async (error) => {
addLog.error(`[MQ Worker] error`, {
message: error.message,
data: { name }
});
});
// Critical: Worker has been closed - remove from pool and restart
newWorker.on('closed', async () => {
addLog.warn(`[MQ Worker] closed, attempting restart...`);
// Clean up: remove all listeners to prevent memory leaks
newWorker.removeAllListeners();
// Retry create new worker with infinite retries
while (true) {
try {
// Call getWorker to create a new worker (now workers.get(name) returns undefined)
const worker = createWorker();
workers.set(name, worker);
addLog.info(`[MQ Worker] restarted successfully`);
break;
} catch (error) {
addLog.error(`[MQ Worker] failed to restart, retrying...`, error);
await delay(1000);
}
}
});
try {
newWorker.on('paused', async () => {
addLog.warn(`[MQ Worker] paused`);
await delay(1000);
workers.delete(name);
getWorker(name, processor, opts);
} catch (error) {}
});
newWorker.resume();
});
newWorker.on('paused', async () => {
addLog.warn(`MQ Worker [${name}] paused`);
await delay(1000);
newWorker.resume();
});
return newWorker;
};
const newWorker = createWorker();
workers.set(name, newWorker);
return newWorker;
}

View File

@ -31,26 +31,13 @@ export async function connectMongo(props: {
db.set('strictQuery', 'throw');
db.connection.on('error', async (error) => {
console.log('mongo error', error);
try {
if (db.connection.readyState !== 0) {
RemoveListeners();
await db.disconnect();
await delay(1000);
await connectMongo(props);
}
} catch (error) {}
console.error('mongo error', error);
});
db.connection.on('connected', async () => {
console.log('mongo connected');
});
db.connection.on('disconnected', async () => {
console.log('mongo disconnected');
try {
if (db.connection.readyState !== 0) {
RemoveListeners();
await db.disconnect();
await delay(1000);
await connectMongo(props);
}
} catch (error) {}
console.error('mongo disconnected');
});
await db.connect(url, {
@ -64,9 +51,9 @@ export async function connectMongo(props: {
maxIdleTimeMS: 300000, // 空闲连接超时: 5分钟,防止空闲连接长时间占用资源
retryWrites: true, // 重试写入: 重试写入失败的操作
retryReads: true, // 重试读取: 重试读取失败的操作
serverSelectionTimeoutMS: 10000 // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞
serverSelectionTimeoutMS: 10000, // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞
heartbeatFrequencyMS: 5000 // 5s 进行一次健康检查
});
console.log('mongo connected');
connectedCb?.();

View File

@ -19,9 +19,11 @@ const REDIS_BASE_OPTION = {
// Reconnect on specific errors (Redis master-slave switch, network issues)
reconnectOnError: (err: any) => {
const reconnectErrors = ['READONLY', 'ECONNREFUSED', 'ETIMEDOUT', 'ECONNRESET'];
const shouldReconnect = reconnectErrors.some((errType) => err.message.includes(errType));
const message = typeof err?.message === 'string' ? err.message : String(err ?? '');
const shouldReconnect = reconnectErrors.some((errType) => message.includes(errType));
if (shouldReconnect) {
addLog.warn(`Redis reconnecting due to error: ${err.message}`);
addLog.warn(`Redis reconnecting due to error: ${message}`);
}
return shouldReconnect;
},
@ -37,9 +39,6 @@ export const newQueueRedisConnection = () => {
// Limit retries for queue operations
maxRetriesPerRequest: 3
});
redis.on('error', (error) => {
addLog.error('[Redis Queue connection error]', error);
});
return redis;
};
@ -49,9 +48,6 @@ export const newWorkerRedisConnection = () => {
// BullMQ requires maxRetriesPerRequest: null for blocking operations
maxRetriesPerRequest: null
});
redis.on('error', (error) => {
addLog.error('[Redis Worker connection error]', error);
});
return redis;
};
@ -65,11 +61,14 @@ export const getGlobalRedisConnection = () => {
maxRetriesPerRequest: 3
});
global.redisClient.on('connect', () => {
addLog.info('[Global Redis] connected');
});
global.redisClient.on('error', (error) => {
addLog.error('[Redis Global connection error]', error);
addLog.error('[Global Redis] connection error', error);
});
global.redisClient.on('close', () => {
addLog.warn('[Redis Global connection closed]');
addLog.warn('[Global Redis] connection closed');
});
return global.redisClient;

View File

@ -40,7 +40,7 @@ export const addS3DelJob = async (data: S3MQJobData): Promise<void> => {
await queue.add('delete-s3-files', data, { jobId, ...jobOption });
};
const prefixDel = async (bucket: S3BaseBucket, prefix: string) => {
export const prefixDel = async (bucket: S3BaseBucket, prefix: string) => {
addLog.debug(`[S3 delete] delete prefix: ${prefix}`);
let tasks: Promise<any>[] = [];
return new Promise<void>(async (resolve, reject) => {
@ -103,7 +103,7 @@ export const startS3DelWorker = async () => {
}
},
{
concurrency: 3
concurrency: 6
}
);
};

View File

@ -196,6 +196,7 @@ try {
// timer, clear history
ChatSchema.index({ updateTime: -1, teamId: 1 });
ChatSchema.index({ teamId: 1, updateTime: -1 });
} catch (error) {
console.log(error);
}

View File

@ -64,6 +64,7 @@ export type ChatResponse = DispatchNodeResultType<
export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResponse> => {
let {
res,
checkIsStopping,
requestOrigin,
stream = false,
retainDatasetCite = true,
@ -201,7 +202,7 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
requestOrigin
},
userKey: externalProvider.openaiAccount,
isAborted: () => res?.closed,
isAborted: checkIsStopping,
onReasoning({ text }) {
if (!aiChatReasoning) return;
workflowStreamResponse?.({

View File

@ -18,6 +18,7 @@ export const runToolCall = async (props: DispatchToolModuleProps): Promise<RunTo
const { messages, toolNodes, toolModel, childrenInteractiveParams, ...workflowProps } = props;
const {
res,
checkIsStopping,
requestOrigin,
runtimeNodes,
runtimeEdges,
@ -129,7 +130,7 @@ export const runToolCall = async (props: DispatchToolModuleProps): Promise<RunTo
retainDatasetCite,
useVision: aiChatVision
},
isAborted: () => res?.closed,
isAborted: checkIsStopping,
userKey: externalProvider.openaiAccount,
onReasoning({ text }) {
if (!aiChatReasoning) return;

View File

@ -59,10 +59,11 @@ import { TeamErrEnum } from '@fastgpt/global/common/error/code/team';
import { i18nT } from '../../../../web/i18n/utils';
import { clone } from 'lodash';
import { validateFileUrlDomain } from '../../../common/security/fileUrlValidator';
import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus';
type Props = Omit<
ChatDispatchProps,
'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables'
'checkIsStopping' | 'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables'
> & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
@ -87,7 +88,17 @@ export async function dispatchWorkFlow({
concatUsage,
...data
}: Props & WorkflowUsageProps): Promise<DispatchFlowResponse> {
const { res, stream, runningUserInfo, runningAppInfo, lastInteractive, histories, query } = data;
const {
res,
stream,
runningUserInfo,
runningAppInfo,
lastInteractive,
histories,
query,
chatId,
apiVersion
} = data;
// Check url valid
const invalidInput = query.some((item) => {
@ -101,6 +112,8 @@ export async function dispatchWorkFlow({
addLog.info('[Workflow run] Invalid file url');
return Promise.reject(new UserError('Invalid file url'));
}
/* Init function */
// Check point
await checkTeamAIPoints(runningUserInfo.teamId);
@ -120,7 +133,22 @@ export async function dispatchWorkFlow({
});
}
return usageId;
})()
})(),
// Add preview url to chat items
await addPreviewUrlToChatItems(histories, 'chatFlow'),
// Add preview url to query
...query.map(async (item) => {
if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) return;
item.file.url = await getS3ChatSource().createGetChatFileURL({
key: item.file.key,
external: true
});
}),
// Remove stopping sign
delAgentRuntimeStopSign({
appId: runningAppInfo.id,
chatId
})
]);
let streamCheckTimer: NodeJS.Timeout | null = null;
@ -152,16 +180,6 @@ export async function dispatchWorkFlow({
}
}
// Add preview url to chat items
await addPreviewUrlToChatItems(histories, 'chatFlow');
for (const item of query) {
if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) continue;
item.file.url = await getS3ChatSource().createGetChatFileURL({
key: item.file.key,
external: true
});
}
// Get default variables
const cloneVariables = clone(data.variables);
const defaultVariables = {
@ -173,12 +191,34 @@ export async function dispatchWorkFlow({
timezone
}))
};
// MCP
let mcpClientMemory = {} as Record<string, MCPClient>;
// Stop sign(没有 apiVersion说明不会有暂停)
let stopping = false;
const checkIsStopping = (): boolean => {
if (apiVersion === 'v2') {
return stopping;
}
if (apiVersion === 'v1') {
if (!res) return false;
return res.closed || !!res.errored;
}
return false;
};
const checkStoppingTimer =
apiVersion === 'v2'
? setInterval(async () => {
stopping = await shouldWorkflowStop({
appId: runningAppInfo.id,
chatId
});
}, 100)
: undefined;
// Init some props
return runWorkflow({
...data,
checkIsStopping,
query,
histories,
timezone,
@ -189,15 +229,24 @@ export async function dispatchWorkFlow({
concatUsage,
mcpClientMemory,
cloneVariables
}).finally(() => {
}).finally(async () => {
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
if (checkStoppingTimer) {
clearInterval(checkStoppingTimer);
}
// Close mcpClient connections
Object.values(mcpClientMemory).forEach((client) => {
client.closeConnection();
});
// 工作流完成后删除 Redis 记录
await delAgentRuntimeStopSign({
appId: runningAppInfo.id,
chatId
});
});
}
@ -210,14 +259,14 @@ type RunWorkflowProps = ChatDispatchProps & {
};
export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowResponse> => {
let {
res,
apiVersion,
checkIsStopping,
runtimeNodes = [],
runtimeEdges = [],
histories = [],
variables = {},
externalProvider,
retainDatasetCite = true,
version = 'v1',
responseDetail = true,
responseAllData = true,
usageId,
@ -328,10 +377,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});
}
get connectionIsActive(): boolean {
return !res?.closed && !res?.errored;
}
// Add active node to queue (if already in the queue, it will not be added again)
addActiveNode(nodeId: string) {
if (this.activeRunQueue.has(nodeId)) {
@ -585,7 +630,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
})();
// Response node response
if (version === 'v2' && !data.isToolCall && isRootRuntime && formatResponseData) {
if (apiVersion === 'v2' && !data.isToolCall && isRootRuntime && formatResponseData) {
data.workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeResponse,
data: responseAllData
@ -813,8 +858,8 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});
return;
}
if (!this.connectionIsActive) {
addLog.warn('Request is closed/errored', {
if (checkIsStopping()) {
addLog.warn('Workflow stopped', {
appId: data.runningAppInfo.id,
nodeId: node.nodeId,
nodeName: node.name

View File

@ -0,0 +1,79 @@
import { addLog } from '../../../common/system/log';
import { getGlobalRedisConnection } from '../../../common/redis/index';
import { delay } from '@fastgpt/global/common/system/utils';
const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping';
const TTL = 60; // 1分钟
export const StopStatus = 'STOPPING';
export type WorkflowStatusParams = {
appId: string;
chatId: string;
};
// 获取工作流状态键
export const getRuntimeStatusKey = (params: WorkflowStatusParams): string => {
return `${WORKFLOW_STATUS_PREFIX}:${params.appId}:${params.chatId}`;
};
// 暂停任务
export const setAgentRuntimeStop = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.set(key, 1, 'EX', TTL);
};
// 删除任务状态
export const delAgentRuntimeStopSign = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.del(key).catch((err) => {
addLog.error(`[Agent Runtime Stop] Delete stop sign error`, err);
});
};
// 检查工作流是否应该停止
export const shouldWorkflowStop = (params: WorkflowStatusParams): Promise<boolean> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
return redis
.get(key)
.then((res) => !!res)
.catch(() => false);
};
/**
* ()
* @param params
* @param timeout (),5
* @param pollInterval (),50
* @returns true=, false=
*/
export const waitForWorkflowComplete = async ({
appId,
chatId,
timeout = 5000,
pollInterval = 50
}: {
appId: string;
chatId: string;
timeout?: number;
pollInterval?: number;
}) => {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const sign = await shouldWorkflowStop({ appId, chatId });
// 如果没有暂停中的标志,则认为已经完成任务了。
if (!sign) {
return;
}
// 等待下一次轮询
await delay(pollInterval);
}
return;
};

View File

@ -17,7 +17,7 @@
"clear_input_value": "清空输入",
"click_contextual_preview": "点击查看上下文预览",
"click_to_add_url": "输入文件链接",
"completion_finish_close": "连接断开",
"completion_finish_close": "请求关闭",
"completion_finish_content_filter": "触发安全风控",
"completion_finish_function_call": "函数调用",
"completion_finish_length": "超出回复限制",

View File

@ -19,6 +19,8 @@ import { useFileUpload } from '../hooks/useFileUpload';
import ComplianceTip from '@/components/common/ComplianceTip/index';
import { useToast } from '@fastgpt/web/hooks/useToast';
import VoiceInput, { type VoiceInputComponentRef } from './VoiceInput';
import MyBox from '@fastgpt/web/components/common/MyBox';
import { postStopV2Chat } from '@/web/core/chat/api';
const InputGuideBox = dynamic(() => import('./InputGuideBox'));
@ -124,6 +126,19 @@ const ChatInput = ({
},
[TextareaDom, canSendMessage, fileList, onSendMessage, replaceFiles]
);
const { runAsync: handleStop, loading: isStopping } = useRequest2(async () => {
try {
if (isChatting) {
await postStopV2Chat({
appId,
chatId,
outLinkAuthData
}).catch();
}
} finally {
onStop();
}
});
const RenderTextarea = useMemo(
() => (
@ -329,7 +344,9 @@ const ChatInput = ({
{/* Send Button Container */}
<Flex alignItems={'center'} w={[8, 9]} h={[8, 9]} borderRadius={'lg'}>
<Flex
<MyBox
isLoading={isStopping}
display={'flex'}
alignItems={'center'}
justifyContent={'center'}
w={[7, 9]}
@ -343,7 +360,7 @@ const ChatInput = ({
onClick={(e) => {
e.stopPropagation();
if (isChatting) {
return onStop();
return handleStop();
}
return handleSend();
}}
@ -355,7 +372,7 @@ const ChatInput = ({
<MyIcon name={'core/chat/sendFill'} {...iconSize} color={'white'} />
</MyTooltip>
)}
</Flex>
</MyBox>
</Flex>
</Flex>
</Flex>
@ -370,12 +387,13 @@ const ChatInput = ({
whisperConfig?.open,
inputValue,
t,
isStopping,
isChatting,
canSendMessage,
onOpenSelectFile,
onSelectFile,
handleSend,
onStop
handleStop
]);
const activeStyles: FlexProps = {

View File

@ -432,10 +432,10 @@ const ChatBox = ({
}, [questionGuide, appId, chatId, outLinkAuthData, scrollToBottom]);
/* Abort chat completions, questionGuide */
const abortRequest = useMemoizedFn((signal: string = 'stop') => {
chatController.current?.abort(signal);
questionGuideController.current?.abort(signal);
pluginController.current?.abort(signal);
const abortRequest = useMemoizedFn((reason: string = 'stop') => {
chatController.current?.abort(new Error(reason));
questionGuideController.current?.abort(new Error(reason));
pluginController.current?.abort(new Error(reason));
});
/**
@ -463,8 +463,7 @@ const ChatBox = ({
}
// Abort the previous request
abortRequest();
questionGuideController.current?.abort('stop');
questionGuideController.current?.abort(new Error('stop'));
text = text.trim();
@ -605,16 +604,18 @@ const ChatBox = ({
newChatHistories = state.map((item, index) => {
if (index !== state.length - 1) return item;
// Check node response error
const responseData = mergeChatResponseData(item.responseData || []);
const err =
responseData[responseData.length - 1]?.error ||
responseData[responseData.length - 1]?.errorText;
if (err) {
toast({
title: t(getErrText(err)),
status: 'warning'
});
// Check node response error
if (!abortSignal?.signal?.aborted) {
const err =
responseData[responseData.length - 1]?.error ||
responseData[responseData.length - 1]?.errorText;
if (err) {
toast({
title: t(getErrText(err)),
status: 'warning'
});
}
}
return {
@ -1184,7 +1185,7 @@ const ChatBox = ({
) : (
<ChatInput
onSendMessage={sendPrompt}
onStop={() => chatController.current?.abort('stop')}
onStop={() => abortRequest('stop')}
TextareaDom={TextareaDom}
resetInputVal={resetInputVal}
chatForm={chatForm}
@ -1206,7 +1207,7 @@ const ChatBox = ({
<ChatInput
onSendMessage={sendPrompt}
onStop={() => chatController.current?.abort('stop')}
onStop={() => abortRequest('stop')}
TextareaDom={TextareaDom}
resetInputVal={resetInputVal}
chatForm={chatForm}

View File

@ -181,6 +181,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
/* start process */
const { flowResponses, assistantResponses, system_memories, newVariables, durationSeconds } =
await dispatchWorkFlow({
apiVersion: 'v2',
res,
lang: getLocale(req),
requestOrigin: req.headers.origin,
@ -209,7 +210,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
stream: true,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES,
workflowStreamResponse: workflowResponseWrite,
version: 'v2',
responseDetail: true
});

View File

@ -11,7 +11,7 @@ import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants
import { getLastInteractiveValue } from '@fastgpt/global/core/workflow/runtime/utils';
import { getLocale } from '@fastgpt/service/common/middle/i18n';
import { createChatUsageRecord } from '@fastgpt/service/support/wallet/usage/controller';
import { clone } from 'lodash';
import { getNanoid } from '@fastgpt/global/common/string/tools';
async function handler(
req: NextApiRequest,
@ -73,6 +73,7 @@ async function handler(
tmbId: app.tmbId
},
runningUserInfo: await getRunningUserInfoByTmbId(tmbId),
chatId: getNanoid(),
runtimeNodes: nodes,
runtimeEdges: edges,
defaultSkipNodeQueue: skipNodeQueue,

View File

@ -278,6 +278,8 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
showNodeStatus
});
const saveChatId = chatId || getNanoid(24);
/* start flow controller */
const {
flowResponses,
@ -289,6 +291,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
} = await (async () => {
if (app.version === 'v2') {
return dispatchWorkFlow({
apiVersion: 'v1',
res,
lang: getLocale(req),
requestOrigin: req.headers.origin,
@ -304,7 +307,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
runningUserInfo: await getRunningUserInfoByTmbId(tmbId),
uid: String(outLinkUserId || tmbId),
chatId,
chatId: saveChatId,
responseChatItemId,
runtimeNodes,
runtimeEdges: storeEdges2RuntimeEdges(edges, interactive),
@ -351,7 +354,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
memories: system_memories
};
const saveChatId = chatId || getNanoid(24);
const params: SaveChatProps = {
chatId: saveChatId,
appId: app._id,

View File

@ -278,6 +278,8 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
showNodeStatus
});
const saveChatId = chatId || getNanoid(24);
/* start flow controller */
const {
flowResponses,
@ -289,6 +291,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
} = await (async () => {
if (app.version === 'v2') {
return dispatchWorkFlow({
apiVersion: 'v2',
res,
lang: getLocale(req),
requestOrigin: req.headers.origin,
@ -304,7 +307,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
runningUserInfo: await getRunningUserInfoByTmbId(tmbId),
uid: String(outLinkUserId || tmbId),
chatId,
chatId: saveChatId,
responseChatItemId,
runtimeNodes,
runtimeEdges: storeEdges2RuntimeEdges(edges, interactive),
@ -317,7 +320,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
retainDatasetCite,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES,
workflowStreamResponse: workflowResponseWrite,
version: 'v2',
responseAllData,
responseDetail
});
@ -354,7 +356,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
memories: system_memories
};
const saveChatId = chatId || getNanoid(24);
const params: SaveChatProps = {
chatId: saveChatId,
appId: app._id,

View File

@ -0,0 +1,36 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { NextAPI } from '@/service/middleware/entry';
import { authChatCrud } from '@/service/support/permission/auth/chat';
import {
setAgentRuntimeStop,
waitForWorkflowComplete
} from '@fastgpt/service/core/workflow/dispatch/workflowStatus';
import { StopV2ChatSchema, type StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api';
async function handler(req: NextApiRequest, res: NextApiResponse): Promise<StopV2ChatResponse> {
const { appId, chatId, outLinkAuthData } = StopV2ChatSchema.parse(req.body);
await authChatCrud({
req,
authToken: true,
authApiKey: true,
appId,
chatId,
...outLinkAuthData
});
// 设置停止状态
await setAgentRuntimeStop({
appId,
chatId
});
// 等待工作流完成 (最多等待 5 秒)
await waitForWorkflowComplete({ appId, chatId, timeout: 5000 });
return {
success: true
};
}
export default NextAPI(handler);

View File

@ -24,6 +24,7 @@ import type {
UpdateFavouriteAppParamsType
} from '@fastgpt/global/openapi/core/chat/favourite/api';
import type { ChatFavouriteAppType } from '@fastgpt/global/core/chat/favouriteApp/type';
import type { StopV2ChatParams } from '@fastgpt/global/openapi/core/chat/api';
/**
*
@ -76,3 +77,6 @@ export const updateFavouriteAppTags = (data: { id: string; tags: string[] }[]) =
export const deleteFavouriteApp = (data: { id: string }) =>
DELETE<null>('/proApi/core/chat/setting/favourite/delete', data);
/* Chat controller */
export const postStopV2Chat = (data: StopV2ChatParams) => POST('/v2/chat/stop', data);

View File

@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users';
import { Call } from '@test/utils/request';
import { describe, expect, it, beforeEach } from 'vitest';
describe.sequential('closeCustom api test', () => {
describe('closeCustom api test', () => {
let testUser: Awaited<ReturnType<typeof getUser>>;
let appId: string;
let chatId: string;

View File

@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users';
import { Call } from '@test/utils/request';
import { describe, expect, it, beforeEach } from 'vitest';
describe.sequential('updateFeedbackReadStatus api test', () => {
describe('updateFeedbackReadStatus api test', () => {
let testUser: Awaited<ReturnType<typeof getUser>>;
let appId: string;
let chatId: string;

View File

@ -14,7 +14,7 @@ import { getUser } from '@test/datas/users';
import { Call } from '@test/utils/request';
import { describe, expect, it, beforeEach } from 'vitest';
describe.sequential('updateUserFeedback api test', () => {
describe('updateUserFeedback api test', () => {
let testUser: Awaited<ReturnType<typeof getUser>>;
let appId: string;
let chatId: string;

View File

@ -3,6 +3,7 @@ import type { Model, Schema } from 'mongoose';
import { Mongoose } from 'mongoose';
export const MONGO_URL = process.env.MONGODB_URI ?? '';
const maxConnecting = Math.max(30, Number(process.env.DB_MAX_LINK || 20));
declare global {
var mongodb: Mongoose | undefined;
@ -52,49 +53,30 @@ export async function connectMongo(db: Mongoose, url: string): Promise<Mongoose>
db.connection.removeAllListeners('disconnected');
db.set('strictQuery', 'throw');
db.connection.on('error', async (error: any) => {
addLog.error('mongo error', error);
try {
if (db.connection.readyState !== 0) {
await db.disconnect();
await delay(1000);
await connectMongo(db, url);
}
} catch (_error) {
addLog.error('Error during reconnection:', _error);
}
db.connection.on('error', async (error) => {
console.error('mongo error', error);
});
db.connection.on('connected', async () => {
console.log('mongo connected');
});
db.connection.on('disconnected', async () => {
addLog.warn('mongo disconnected');
try {
if (db.connection.readyState !== 0) {
await db.disconnect();
await delay(1000);
await connectMongo(db, url);
}
} catch (_error) {
addLog.error('Error during reconnection:', _error);
}
console.error('mongo disconnected');
});
const options = {
await db.connect(url, {
bufferCommands: true,
maxPoolSize: Math.max(30, Number(process.env.MONGO_MAX_LINK || 20)),
minPoolSize: 20,
connectTimeoutMS: 60000,
waitQueueTimeoutMS: 60000,
socketTimeoutMS: 60000,
maxIdleTimeMS: 300000,
retryWrites: true,
retryReads: true,
serverSelectionTimeoutMS: 60000,
heartbeatFrequencyMS: 20000,
maxStalenessSeconds: 120
};
await db.connect(url, options);
addLog.info('mongo connected');
maxConnecting: maxConnecting, // 最大连接数: 防止连接数过多时无法满足需求
maxPoolSize: maxConnecting, // 最大连接池大小: 防止连接池过大时无法满足需求
minPoolSize: 20, // 最小连接数: 20,防止连接数过少时无法满足需求
connectTimeoutMS: 60000, // 连接超时: 60秒,防止连接失败时长时间阻塞
waitQueueTimeoutMS: 60000, // 等待队列超时: 60秒,防止等待队列长时间阻塞
socketTimeoutMS: 60000, // Socket 超时: 60秒,防止Socket连接失败时长时间阻塞
maxIdleTimeMS: 300000, // 空闲连接超时: 5分钟,防止空闲连接长时间占用资源
retryWrites: true, // 重试写入: 重试写入失败的操作
retryReads: true, // 重试读取: 重试读取失败的操作
serverSelectionTimeoutMS: 10000, // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞
heartbeatFrequencyMS: 5000 // 5s 进行一次健康检查
});
return db;
} catch (error) {
addLog.error('Mongo connect error', error);

View File

@ -0,0 +1,117 @@
import { describe, test, expect, beforeEach } from 'vitest';
import {
setAgentRuntimeStop,
delAgentRuntimeStopSign,
shouldWorkflowStop,
waitForWorkflowComplete
} from '@fastgpt/service/core/workflow/dispatch/workflowStatus';
describe('Workflow Status Redis Functions', () => {
const testAppId = 'test_app_123';
const testChatId = 'test_chat_456';
beforeEach(async () => {
// 清理测试数据
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
});
test('should set stopping sign', async () => {
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
test('should return false for non-existent status', async () => {
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should detect stopping status', async () => {
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
test('should return false after deleting stop sign', async () => {
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
await delAgentRuntimeStopSign({
appId: testAppId,
chatId: testChatId
});
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should wait for workflow completion', async () => {
// 设置初始停止标志
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
// 模拟异步完成(删除停止标志)
setTimeout(async () => {
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
}, 500);
// 等待完成waitForWorkflowComplete 现在是 void 返回
await waitForWorkflowComplete({
appId: testAppId,
chatId: testChatId,
timeout: 2000
});
// 验证停止标志已被删除
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should timeout when waiting too long', async () => {
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
// 等待超时(不删除标志)
await waitForWorkflowComplete({
appId: testAppId,
chatId: testChatId,
timeout: 100
});
// 验证停止标志仍然存在
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
test('should delete workflow stop sign', async () => {
await setAgentRuntimeStop({
appId: testAppId,
chatId: testChatId
});
await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId });
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(false);
});
test('should handle concurrent stop sign operations', async () => {
// 并发设置停止标志
await Promise.all([
setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }),
setAgentRuntimeStop({ appId: testAppId, chatId: testChatId })
]);
// 停止标志应该存在
const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId });
expect(shouldStop).toBe(true);
});
});

View File

@ -1,73 +1,233 @@
import { vi } from 'vitest';
// In-memory storage for mock Redis
const createRedisStorage = () => {
const storage = new Map<string, any>();
const expiryMap = new Map<string, number>();
// Check and remove expired keys
const isExpired = (key: string): boolean => {
const expiry = expiryMap.get(key);
if (expiry && expiry < Date.now()) {
storage.delete(key);
expiryMap.delete(key);
return true;
}
return false;
};
return {
get: (key: string) => {
if (isExpired(key)) return null;
return storage.get(key) ?? null;
},
set: (key: string, value: any, exMode?: string, exValue?: number) => {
storage.set(key, value);
// Handle EX (seconds) and PX (milliseconds) options
if (exMode === 'EX' && typeof exValue === 'number') {
expiryMap.set(key, Date.now() + exValue * 1000);
} else if (exMode === 'PX' && typeof exValue === 'number') {
expiryMap.set(key, Date.now() + exValue);
}
return 'OK';
},
del: (...keys: string[]) => {
let deletedCount = 0;
keys.forEach((key) => {
if (storage.has(key)) {
storage.delete(key);
expiryMap.delete(key);
deletedCount++;
}
});
return deletedCount;
},
exists: (...keys: string[]) => {
let count = 0;
keys.forEach((key) => {
if (!isExpired(key) && storage.has(key)) count++;
});
return count;
},
clear: () => {
storage.clear();
expiryMap.clear();
}
};
};
// Create a comprehensive mock Redis client factory
const createMockRedisClient = () => ({
// Connection methods
on: vi.fn().mockReturnThis(),
connect: vi.fn().mockResolvedValue(undefined),
disconnect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue('OK'),
duplicate: vi.fn(function (this: any) {
return createMockRedisClient();
}),
const createMockRedisClient = () => {
const redisStorage = createRedisStorage();
// Key-value operations
get: vi.fn().mockResolvedValue(null),
set: vi.fn().mockResolvedValue('OK'),
del: vi.fn().mockResolvedValue(1),
exists: vi.fn().mockResolvedValue(0),
keys: vi.fn().mockResolvedValue([]),
scan: vi.fn().mockImplementation((cursor) => {
// 模拟多次迭代的场景
if (cursor === '0') return ['100', ['key1', 'key2']];
if (cursor === '100') return ['0', ['key3']];
return ['0', []];
}),
return {
// Connection methods
on: vi.fn().mockReturnThis(),
connect: vi.fn().mockResolvedValue(undefined),
disconnect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue('OK'),
duplicate: vi.fn(function (this: any) {
return createMockRedisClient();
}),
// Hash operations
hget: vi.fn().mockResolvedValue(null),
hset: vi.fn().mockResolvedValue(1),
hdel: vi.fn().mockResolvedValue(1),
hgetall: vi.fn().mockResolvedValue({}),
hmset: vi.fn().mockResolvedValue('OK'),
// Key-value operations with actual storage
get: vi.fn().mockImplementation((key: string) => Promise.resolve(redisStorage.get(key))),
set: vi
.fn()
.mockImplementation((key: string, value: any, exMode?: string, exValue?: number) =>
Promise.resolve(redisStorage.set(key, value, exMode, exValue))
),
del: vi
.fn()
.mockImplementation((...keys: string[]) => Promise.resolve(redisStorage.del(...keys))),
exists: vi
.fn()
.mockImplementation((...keys: string[]) => Promise.resolve(redisStorage.exists(...keys))),
keys: vi.fn().mockResolvedValue([]),
scan: vi.fn().mockImplementation((cursor) => {
// 模拟多次迭代的场景
if (cursor === '0') return ['100', ['key1', 'key2']];
if (cursor === '100') return ['0', ['key3']];
return ['0', []];
}),
// Expiry operations
expire: vi.fn().mockResolvedValue(1),
ttl: vi.fn().mockResolvedValue(-1),
expireat: vi.fn().mockResolvedValue(1),
// Hash operations
hget: vi.fn().mockResolvedValue(null),
hset: vi.fn().mockResolvedValue(1),
hdel: vi.fn().mockResolvedValue(1),
hgetall: vi.fn().mockResolvedValue({}),
hmset: vi.fn().mockResolvedValue('OK'),
// Increment operations
incr: vi.fn().mockResolvedValue(1),
decr: vi.fn().mockResolvedValue(1),
incrby: vi.fn().mockResolvedValue(1),
decrby: vi.fn().mockResolvedValue(1),
incrbyfloat: vi.fn().mockResolvedValue(1),
// Expiry operations
expire: vi.fn().mockResolvedValue(1),
ttl: vi.fn().mockResolvedValue(-1),
expireat: vi.fn().mockResolvedValue(1),
// Server commands
info: vi.fn().mockResolvedValue(''),
ping: vi.fn().mockResolvedValue('PONG'),
flushdb: vi.fn().mockResolvedValue('OK'),
// Increment operations
incr: vi.fn().mockResolvedValue(1),
decr: vi.fn().mockResolvedValue(1),
incrby: vi.fn().mockResolvedValue(1),
decrby: vi.fn().mockResolvedValue(1),
incrbyfloat: vi.fn().mockResolvedValue(1),
// List operations
lpush: vi.fn().mockResolvedValue(1),
rpush: vi.fn().mockResolvedValue(1),
lpop: vi.fn().mockResolvedValue(null),
rpop: vi.fn().mockResolvedValue(null),
llen: vi.fn().mockResolvedValue(0),
// Server commands
info: vi.fn().mockResolvedValue(''),
ping: vi.fn().mockResolvedValue('PONG'),
flushdb: vi.fn().mockResolvedValue('OK'),
// Set operations
sadd: vi.fn().mockResolvedValue(1),
srem: vi.fn().mockResolvedValue(1),
smembers: vi.fn().mockResolvedValue([]),
sismember: vi.fn().mockResolvedValue(0),
// List operations
lpush: vi.fn().mockResolvedValue(1),
rpush: vi.fn().mockResolvedValue(1),
lpop: vi.fn().mockResolvedValue(null),
rpop: vi.fn().mockResolvedValue(null),
llen: vi.fn().mockResolvedValue(0),
// pipeline
pipeline: vi.fn(() => ({
del: vi.fn().mockReturnThis(),
unlink: vi.fn().mockReturnThis(),
exec: vi.fn().mockResolvedValue([])
}))
});
// Set operations
sadd: vi.fn().mockResolvedValue(1),
srem: vi.fn().mockResolvedValue(1),
smembers: vi.fn().mockResolvedValue([]),
sismember: vi.fn().mockResolvedValue(0),
// pipeline
pipeline: vi.fn(() => ({
del: vi.fn().mockReturnThis(),
unlink: vi.fn().mockReturnThis(),
exec: vi.fn().mockResolvedValue([])
})),
// Internal storage for testing purposes
_storage: redisStorage
};
};
// Shared global Redis storage for all mock clients
const globalRedisStorage = createRedisStorage();
// Create mock client with shared storage
const createSharedMockRedisClient = () => {
return {
// Connection methods
on: vi.fn().mockReturnThis(),
connect: vi.fn().mockResolvedValue(undefined),
disconnect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue('OK'),
duplicate: vi.fn(function (this: any) {
return createSharedMockRedisClient();
}),
// Key-value operations with shared storage
get: vi.fn().mockImplementation((key: string) => Promise.resolve(globalRedisStorage.get(key))),
set: vi
.fn()
.mockImplementation((key: string, value: any, exMode?: string, exValue?: number) =>
Promise.resolve(globalRedisStorage.set(key, value, exMode, exValue))
),
del: vi
.fn()
.mockImplementation((...keys: string[]) => Promise.resolve(globalRedisStorage.del(...keys))),
exists: vi
.fn()
.mockImplementation((...keys: string[]) =>
Promise.resolve(globalRedisStorage.exists(...keys))
),
keys: vi.fn().mockResolvedValue([]),
scan: vi.fn().mockImplementation((cursor) => {
if (cursor === '0') return ['100', ['key1', 'key2']];
if (cursor === '100') return ['0', ['key3']];
return ['0', []];
}),
// Hash operations
hget: vi.fn().mockResolvedValue(null),
hset: vi.fn().mockResolvedValue(1),
hdel: vi.fn().mockResolvedValue(1),
hgetall: vi.fn().mockResolvedValue({}),
hmset: vi.fn().mockResolvedValue('OK'),
// Expiry operations
expire: vi.fn().mockResolvedValue(1),
ttl: vi.fn().mockResolvedValue(-1),
expireat: vi.fn().mockResolvedValue(1),
// Increment operations
incr: vi.fn().mockResolvedValue(1),
decr: vi.fn().mockResolvedValue(1),
incrby: vi.fn().mockResolvedValue(1),
decrby: vi.fn().mockResolvedValue(1),
incrbyfloat: vi.fn().mockResolvedValue(1),
// Server commands
info: vi.fn().mockResolvedValue(''),
ping: vi.fn().mockResolvedValue('PONG'),
flushdb: vi.fn().mockImplementation(() => {
globalRedisStorage.clear();
return Promise.resolve('OK');
}),
// List operations
lpush: vi.fn().mockResolvedValue(1),
rpush: vi.fn().mockResolvedValue(1),
lpop: vi.fn().mockResolvedValue(null),
rpop: vi.fn().mockResolvedValue(null),
llen: vi.fn().mockResolvedValue(0),
// Set operations
sadd: vi.fn().mockResolvedValue(1),
srem: vi.fn().mockResolvedValue(1),
smembers: vi.fn().mockResolvedValue([]),
sismember: vi.fn().mockResolvedValue(0),
// pipeline
pipeline: vi.fn(() => ({
del: vi.fn().mockReturnThis(),
unlink: vi.fn().mockReturnThis(),
exec: vi.fn().mockResolvedValue([])
})),
// Internal storage for testing purposes
_storage: globalRedisStorage
};
};
// Mock Redis connections to prevent connection errors in tests
vi.mock('@fastgpt/service/common/redis', async (importOriginal) => {
@ -75,20 +235,20 @@ vi.mock('@fastgpt/service/common/redis', async (importOriginal) => {
return {
...actual,
newQueueRedisConnection: vi.fn(createMockRedisClient),
newWorkerRedisConnection: vi.fn(createMockRedisClient),
newQueueRedisConnection: vi.fn(createSharedMockRedisClient),
newWorkerRedisConnection: vi.fn(createSharedMockRedisClient),
getGlobalRedisConnection: vi.fn(() => {
if (!global.mockRedisClient) {
global.mockRedisClient = createMockRedisClient();
global.mockRedisClient = createSharedMockRedisClient();
}
return global.mockRedisClient;
}),
initRedisClient: vi.fn().mockResolvedValue(createMockRedisClient())
initRedisClient: vi.fn().mockResolvedValue(createSharedMockRedisClient())
};
});
// Initialize global.redisClient with mock before any module imports it
// This prevents getGlobalRedisConnection() from creating a real Redis client
if (!global.redisClient) {
global.redisClient = createMockRedisClient() as any;
global.redisClient = createSharedMockRedisClient() as any;
}

View File

@ -49,8 +49,6 @@ beforeEach(async () => {
onTestFinished(async () => {
clean();
// Wait for any ongoing transactions and operations to complete
await delay(500);
// Ensure all sessions are closed before dropping database
try {
@ -62,9 +60,6 @@ beforeEach(async () => {
// Ignore errors during cleanup
console.warn('Error during test cleanup:', error);
}
// Additional delay to prevent lock contention between tests
await delay(100);
});
});

View File

@ -20,8 +20,10 @@ export default defineConfig({
outputFile: 'test-results.json',
setupFiles: 'test/setup.ts',
globalSetup: 'test/globalSetup.ts',
// fileParallelism: false,
maxConcurrency: 5,
// File-level execution: serial (one file at a time to avoid MongoDB conflicts)
fileParallelism: false,
// Test-level execution within a file: parallel (up to 5 concurrent tests)
maxConcurrency: 10,
pool: 'threads',
include: [
'test/test.ts',
@ -31,6 +33,7 @@ export default defineConfig({
'projects/marketplace/test/**/*.test.ts'
],
testTimeout: 20000,
hookTimeout: 30000,
reporters: ['github-actions', 'default']
}
});