From 4f95f6867e512704559db28dad60da0a9a34613c Mon Sep 17 00:00:00 2001 From: heheer Date: Sat, 20 Dec 2025 13:11:02 +0800 Subject: [PATCH] app delete queue (#6122) * app delete queue * test * perf: del app queue * perf: log * perf: query * perf: retry del s3 * fix: ts * perf: add job * redis retry * perf: mq check * update log * perf: mq concurrency * perf: error check * perf: mq * perf: init model --------- Co-authored-by: archer <545436317@qq.com> --- document/content/docs/upgrading/4-14/4145.mdx | 1 + document/data/doc-last-modified.json | 2 +- packages/global/core/app/type.d.ts | 3 + packages/service/common/bullmq/index.ts | 36 +- packages/service/common/redis/index.ts | 58 +- packages/service/common/s3/buckets/base.ts | 57 +- packages/service/common/s3/controller.ts | 27 +- packages/service/common/s3/mq.ts | 143 +++-- packages/service/common/s3/sources/avatar.ts | 2 +- packages/service/core/ai/config/utils.ts | 65 +- packages/service/core/app/controller.ts | 153 ++--- packages/service/core/app/delete/index.ts | 41 ++ packages/service/core/app/delete/processor.ts | 77 +++ packages/service/core/app/schema.ts | 7 + projects/app/src/pages/api/admin/initv4145.ts | 67 ++ projects/app/src/pages/api/core/app/del.ts | 36 +- projects/app/src/pages/api/core/app/list.ts | 5 +- .../app/src/service/common/bullmq/index.ts | 2 + projects/app/test/api/core/app/delete.test.ts | 582 ++++++++++++++++++ .../core/chat/feedback/closeCustom.test.ts | 2 +- .../feedback/updateFeedbackReadStatus.test.ts | 2 +- .../chat/feedback/updateUserFeedback.test.ts | 2 +- 22 files changed, 1100 insertions(+), 270 deletions(-) create mode 100644 packages/service/core/app/delete/index.ts create mode 100644 packages/service/core/app/delete/processor.ts create mode 100644 projects/app/src/pages/api/admin/initv4145.ts create mode 100644 projects/app/test/api/core/app/delete.test.ts diff --git a/document/content/docs/upgrading/4-14/4145.mdx b/document/content/docs/upgrading/4-14/4145.mdx index 95df5d844..5166bc062 100644 --- a/document/content/docs/upgrading/4-14/4145.mdx +++ b/document/content/docs/upgrading/4-14/4145.mdx @@ -11,6 +11,7 @@ description: 'FastGPT V4.14.5 更新说明' ## ⚙️ 优化 1. 优化获取 redis 所有 key 的逻辑,避免大量获取时导致阻塞。 +2. Redis 和 MQ 的重连逻辑优化。 ## 🐛 修复 diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index a034479c5..e7ecdc900 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -120,7 +120,7 @@ "document/content/docs/upgrading/4-14/4142.mdx": "2025-11-18T19:27:14+08:00", "document/content/docs/upgrading/4-14/4143.mdx": "2025-11-26T20:52:05+08:00", "document/content/docs/upgrading/4-14/4144.mdx": "2025-12-16T14:56:04+08:00", - "document/content/docs/upgrading/4-14/4145.mdx": "2025-12-18T23:25:48+08:00", + "document/content/docs/upgrading/4-14/4145.mdx": "2025-12-19T00:08:30+08:00", "document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00", diff --git a/packages/global/core/app/type.d.ts b/packages/global/core/app/type.d.ts index 7f7b897a1..7f2e789d4 100644 --- a/packages/global/core/app/type.d.ts +++ b/packages/global/core/app/type.d.ts @@ -59,6 +59,9 @@ export type AppSchema = { inited?: boolean; /** @deprecated */ teamTags: string[]; + + // 软删除字段 + deleteTime?: Date | null; }; export type AppListItemType = { diff --git a/packages/service/common/bullmq/index.ts b/packages/service/common/bullmq/index.ts index b0d3d1961..2ee7df802 100644 --- a/packages/service/common/bullmq/index.ts +++ b/packages/service/common/bullmq/index.ts @@ -8,6 +8,7 @@ import { } from 'bullmq'; import { addLog } from '../system/log'; import { newQueueRedisConnection, newWorkerRedisConnection } from '../redis'; +import { delay } from '@fastgpt/global/common/system/utils'; const defaultWorkerOpts: Omit = { removeOnComplete: { @@ -25,6 +26,7 @@ export enum QueueNames { // Delete Queue datasetDelete = 'datasetDelete', + appDelete = 'appDelete', // @deprecated websiteSync = 'websiteSync' } @@ -77,15 +79,41 @@ export function getWorker( const newWorker = new Worker(name.toString(), processor, { connection: newWorkerRedisConnection(), ...defaultWorkerOpts, + // BullMQ Worker important settings + lockDuration: 600000, // 10 minutes for large file operations + stalledInterval: 30000, // Check for stalled jobs every 30s + maxStalledCount: 3, // Move job to failed after 1 stall (default behavior) ...opts }); // default error handler, to avoid unhandled exceptions - newWorker.on('error', (error) => { - addLog.error(`MQ Worker [${name}]: ${error.message}`, error); + newWorker.on('error', async (error) => { + addLog.error(`MQ Worker error`, { + message: error.message, + data: { name } + }); + await newWorker.close(); }); - newWorker.on('failed', (jobId, error) => { - addLog.error(`MQ Worker [${name}]: ${error.message}`, error); + // Critical: Worker has been closed - remove from pool + newWorker.on('closed', async () => { + addLog.error(`MQ Worker [${name}] closed unexpectedly`, { + data: { + name, + message: 'Worker will need to be manually restarted' + } + }); + try { + await delay(1000); + workers.delete(name); + getWorker(name, processor, opts); + } catch (error) {} }); + + newWorker.on('paused', async () => { + addLog.warn(`MQ Worker [${name}] paused`); + await delay(1000); + newWorker.resume(); + }); + workers.set(name, newWorker); return newWorker; } diff --git a/packages/service/common/redis/index.ts b/packages/service/common/redis/index.ts index c0973e35e..701613d50 100644 --- a/packages/service/common/redis/index.ts +++ b/packages/service/common/redis/index.ts @@ -3,26 +3,54 @@ import Redis from 'ioredis'; const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379'; +// Base Redis options for connection reliability +const REDIS_BASE_OPTION = { + // Retry strategy: exponential backoff with unlimited retries for stability + retryStrategy: (times: number) => { + // Never give up retrying to ensure worker keeps running + const delay = Math.min(times * 50, 2000); // Max 2s between retries + if (times > 10) { + addLog.error(`[Redis connection failed] attempt ${times}, will keep retrying...`); + } else { + addLog.warn(`Redis reconnecting... attempt ${times}, delay ${delay}ms`); + } + return delay; // Always return a delay to keep retrying + }, + // Reconnect on specific errors (Redis master-slave switch, network issues) + reconnectOnError: (err: any) => { + const reconnectErrors = ['READONLY', 'ECONNREFUSED', 'ETIMEDOUT', 'ECONNRESET']; + const shouldReconnect = reconnectErrors.some((errType) => err.message.includes(errType)); + if (shouldReconnect) { + addLog.warn(`Redis reconnecting due to error: ${err.message}`); + } + return shouldReconnect; + }, + // Connection timeout + connectTimeout: 10000, // 10 seconds + // Enable offline queue to buffer commands when disconnected + enableOfflineQueue: true +}; + export const newQueueRedisConnection = () => { - const redis = new Redis(REDIS_URL); - redis.on('connect', () => { - console.log('Redis connected'); + const redis = new Redis(REDIS_URL, { + ...REDIS_BASE_OPTION, + // Limit retries for queue operations + maxRetriesPerRequest: 3 }); redis.on('error', (error) => { - console.error('Redis connection error', error); + addLog.error('[Redis Queue connection error]', error); }); return redis; }; export const newWorkerRedisConnection = () => { const redis = new Redis(REDIS_URL, { + ...REDIS_BASE_OPTION, + // BullMQ requires maxRetriesPerRequest: null for blocking operations maxRetriesPerRequest: null }); - redis.on('connect', () => { - console.log('Redis connected'); - }); redis.on('error', (error) => { - console.error('Redis connection error', error); + addLog.error('[Redis Worker connection error]', error); }); return redis; }; @@ -31,13 +59,17 @@ export const FASTGPT_REDIS_PREFIX = 'fastgpt:'; export const getGlobalRedisConnection = () => { if (global.redisClient) return global.redisClient; - global.redisClient = new Redis(REDIS_URL, { keyPrefix: FASTGPT_REDIS_PREFIX }); - - global.redisClient.on('connect', () => { - addLog.info('Redis connected'); + global.redisClient = new Redis(REDIS_URL, { + ...REDIS_BASE_OPTION, + keyPrefix: FASTGPT_REDIS_PREFIX, + maxRetriesPerRequest: 3 }); + global.redisClient.on('error', (error) => { - addLog.error('Redis connection error', error); + addLog.error('[Redis Global connection error]', error); + }); + global.redisClient.on('close', () => { + addLog.warn('[Redis Global connection closed]'); }); return global.redisClient; diff --git a/packages/service/common/s3/buckets/base.ts b/packages/service/common/s3/buckets/base.ts index 2df0a485e..82c44f4c9 100644 --- a/packages/service/common/s3/buckets/base.ts +++ b/packages/service/common/s3/buckets/base.ts @@ -1,4 +1,11 @@ -import { Client, type RemoveOptions, type CopyConditions, S3Error } from 'minio'; +import { + Client, + type RemoveOptions, + type CopyConditions, + S3Error, + InvalidObjectNameError, + InvalidXMLError +} from 'minio'; import { type CreatePostPresignedUrlOptions, type CreatePostPresignedUrlParams, @@ -17,6 +24,25 @@ import { type Readable } from 'node:stream'; import { type UploadFileByBufferParams, UploadFileByBufferSchema } from '../type'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; +// Check if the error is a "file not found" type error, which should be treated as success +export const isFileNotFoundError = (error: any): boolean => { + if (error instanceof S3Error) { + // Handle various "not found" error codes + return ( + error.code === 'NoSuchKey' || + error.code === 'InvalidObjectName' || + error.message === 'Not Found' || + error.message === + 'The request signature we calculated does not match the signature you provided. Check your key and signing method.' || + error.message.includes('Object name contains unsupported characters.') + ); + } + if (error instanceof InvalidObjectNameError || error instanceof InvalidXMLError) { + return true; + } + return false; +}; + export class S3BaseBucket { private _client: Client; private _externalClient: Client | undefined; @@ -94,7 +120,7 @@ export class S3BaseBucket { temporary: false } }); - await this.delete(from); + await this.removeObject(from); } async copy({ @@ -120,24 +146,17 @@ export class S3BaseBucket { return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions); } - async delete(objectKey: string, options?: RemoveOptions): Promise { - try { - if (!objectKey) return Promise.resolve(); - - // 把连带的 parsed 数据一起删除 - const fileParsedPrefix = `${path.dirname(objectKey)}/${path.basename(objectKey, path.extname(objectKey))}-parsed`; - await this.addDeleteJob({ prefix: fileParsedPrefix }); - - return await this.client.removeObject(this.bucketName, objectKey, options); - } catch (error) { - if (error instanceof S3Error) { - if (error.code === 'InvalidObjectName') { - addLog.warn(`${this.bucketName} delete object not found: ${objectKey}`, error); - return Promise.resolve(); - } + async removeObject(objectKey: string, options?: RemoveOptions): Promise { + return this.client.removeObject(this.bucketName, objectKey, options).catch((err) => { + if (isFileNotFoundError(err)) { + return Promise.resolve(); } - return Promise.reject(error); - } + addLog.error(`[S3 delete error]`, { + message: err.message, + data: { code: err.code, key: objectKey } + }); + throw err; + }); } // 列出文件 diff --git a/packages/service/common/s3/controller.ts b/packages/service/common/s3/controller.ts index f66281a52..be1ffefc9 100644 --- a/packages/service/common/s3/controller.ts +++ b/packages/service/common/s3/controller.ts @@ -27,26 +27,7 @@ export async function clearExpiredMinioFiles() { const bucket = global.s3BucketMap[bucketName]; if (bucket) { - await bucket.delete(file.minioKey); - - if (!file.minioKey.includes('-parsed/')) { - try { - const dir = path.dirname(file.minioKey); - const basename = path.basename(file.minioKey); - const ext = path.extname(basename); - - if (ext) { - const nameWithoutExt = path.basename(basename, ext); - const parsedPrefix = `${dir}/${nameWithoutExt}-parsed`; - - await bucket.addDeleteJob({ prefix: parsedPrefix }); - addLog.info(`Scheduled deletion of parsed images: ${parsedPrefix}`); - } - } catch (error) { - addLog.debug(`Failed to schedule parsed images deletion for ${file.minioKey}`); - } - } - + await bucket.addDeleteJob({ key: file.minioKey }); await MongoS3TTL.deleteOne({ _id: file._id }); success++; @@ -57,12 +38,6 @@ export async function clearExpiredMinioFiles() { addLog.warn(`Bucket not found: ${file.bucketName}`); } } catch (error) { - if ( - error instanceof S3Error && - error.message.includes('Object name contains unsupported characters.') - ) { - await MongoS3TTL.deleteOne({ _id: file._id }); - } fail++; addLog.error(`Failed to delete minio file: ${file.minioKey}`, error); } diff --git a/packages/service/common/s3/mq.ts b/packages/service/common/s3/mq.ts index 6ab32b984..c4712c7dc 100644 --- a/packages/service/common/s3/mq.ts +++ b/packages/service/common/s3/mq.ts @@ -1,7 +1,8 @@ import { getQueue, getWorker, QueueNames } from '../bullmq'; -import pLimit from 'p-limit'; -import { retryFn } from '@fastgpt/global/common/system/utils'; import { addLog } from '../system/log'; +import path from 'path'; +import { batchRun } from '@fastgpt/global/common/system/utils'; +import { isFileNotFoundError, type S3BaseBucket } from './buckets/base'; export type S3MQJobData = { key?: string; @@ -10,89 +11,99 @@ export type S3MQJobData = { bucketName: string; }; +const jobOption = { + attempts: 10, + removeOnFail: { + count: 10000, // 保留10000个失败任务 + age: 14 * 24 * 60 * 60 // 14 days + }, + removeOnComplete: true, + backoff: { + delay: 2000, + type: 'exponential' + } +}; export const addS3DelJob = async (data: S3MQJobData): Promise => { const queue = getQueue(QueueNames.s3FileDelete); - - await queue.add( - 'delete-s3-files', - { ...data }, - { - attempts: 3, - removeOnFail: false, - removeOnComplete: true, - backoff: { - delay: 2000, - type: 'exponential' - } + const jobId = (() => { + if (data.key) { + return data.key; } - ); + if (data.keys) { + return undefined; + } + if (data.prefix) { + return data.prefix; + } + throw new Error('Invalid s3 delete job data'); + })(); + await queue.add('delete-s3-files', data, { jobId, ...jobOption }); +}; + +const prefixDel = async (bucket: S3BaseBucket, prefix: string) => { + addLog.debug(`[S3 delete] delete prefix: ${prefix}`); + let tasks: Promise[] = []; + return new Promise(async (resolve, reject) => { + const stream = bucket.listObjectsV2(prefix, true); + stream.on('data', (file) => { + if (!file.name) return; + tasks.push(bucket.removeObject(file.name)); + }); + + stream.on('end', async () => { + if (tasks.length === 0) { + return resolve(); + } + + const results = await Promise.allSettled(tasks); + const failed = results.some((r) => r.status === 'rejected'); + if (failed) { + addLog.error(`[S3 delete] delete prefix failed: ${prefix}`); + reject('Some deletes failed'); + } + resolve(); + }); + + stream.on('error', (err) => { + if (isFileNotFoundError(err)) { + return resolve(); + } + addLog.error(`[S3 delete] delete prefix: ${prefix} error`, err); + reject(err); + }); + }); }; export const startS3DelWorker = async () => { return getWorker( QueueNames.s3FileDelete, async (job) => { - const { prefix, bucketName, key, keys } = job.data; - const limit = pLimit(10); - const bucket = s3BucketMap[bucketName]; + let { prefix, bucketName, key, keys } = job.data; + const bucket = global.s3BucketMap[bucketName]; if (!bucket) { - return Promise.reject(`Bucket not found: ${bucketName}`); + addLog.error(`Bucket not found: ${bucketName}`); + return; } if (key) { - addLog.info(`[S3 delete] delete key: ${key}`); - await bucket.delete(key); - addLog.info(`[S3 delete] delete key: ${key} success`); + keys = [key]; } if (keys) { - addLog.info(`[S3 delete] delete keys: ${keys.length}`); - const tasks: Promise[] = []; - for (const key of keys) { - const p = limit(() => retryFn(() => bucket.delete(key))); - tasks.push(p); - } - await Promise.all(tasks); - addLog.info(`[S3 delete] delete keys: ${keys.length} success`); + addLog.debug(`[S3 delete] delete keys: ${keys.length}`); + await batchRun(keys, async (key) => { + await bucket.removeObject(key); + // Delete parsed + if (!key.includes('-parsed/')) { + const fileParsedPrefix = `${path.dirname(key)}/${path.basename(key, path.extname(key))}-parsed`; + await prefixDel(bucket, fileParsedPrefix); + } + }); } if (prefix) { - addLog.info(`[S3 delete] delete prefix: ${prefix}`); - const tasks: Promise[] = []; - return new Promise(async (resolve, reject) => { - const stream = bucket.listObjectsV2(prefix, true); - stream.on('data', async (file) => { - if (!file.name) return; - - const p = limit(() => - // 因为封装的 delete 方法里,包含前缀删除,这里不能再使用,避免循环。 - retryFn(() => bucket.client.removeObject(bucket.bucketName, file.name)) - ); - tasks.push(p); - }); - - stream.on('end', async () => { - try { - const results = await Promise.allSettled(tasks); - const failed = results.filter((r) => r.status === 'rejected'); - if (failed.length > 0) { - addLog.error(`[S3 delete] delete prefix: ${prefix} failed`); - reject('Some deletes failed'); - } - addLog.info(`[S3 delete] delete prefix: ${prefix} success`); - resolve(); - } catch (err) { - addLog.error(`[S3 delete] delete prefix: ${prefix} error`, err); - reject(err); - } - }); - - stream.on('error', (err) => { - addLog.error(`[S3 delete] delete prefix: ${prefix} error`, err); - reject(err); - }); - }); + await prefixDel(bucket, prefix); } }, { - concurrency: 1 + concurrency: 3 } ); }; diff --git a/packages/service/common/s3/sources/avatar.ts b/packages/service/common/s3/sources/avatar.ts index 0d9ef031d..17469b6ab 100644 --- a/packages/service/common/s3/sources/avatar.ts +++ b/packages/service/common/s3/sources/avatar.ts @@ -42,7 +42,7 @@ class S3AvatarSource extends S3PublicBucket { async deleteAvatar(avatar: string, session?: ClientSession): Promise { const key = avatar.slice(this.prefix.length); await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucketName }, session); - await this.delete(key); + await this.removeObject(key); } async refreshAvatar(newAvatar?: string, oldAvatar?: string, session?: ClientSession) { diff --git a/packages/service/core/ai/config/utils.ts b/packages/service/core/ai/config/utils.ts index 3e53ef8d6..4d0b7947b 100644 --- a/packages/service/core/ai/config/utils.ts +++ b/packages/service/core/ai/config/utils.ts @@ -120,42 +120,40 @@ export const loadSystemModels = async (init = false, language = 'en') => { ]); // Load system model from local - await Promise.all( - systemModels.map(async (model) => { - const mergeObject = (obj1: any, obj2: any) => { - if (!obj1 && !obj2) return undefined; - const formatObj1 = typeof obj1 === 'object' ? obj1 : {}; - const formatObj2 = typeof obj2 === 'object' ? obj2 : {}; - return { ...formatObj1, ...formatObj2 }; - }; + systemModels.forEach((model) => { + const mergeObject = (obj1: any, obj2: any) => { + if (!obj1 && !obj2) return undefined; + const formatObj1 = typeof obj1 === 'object' ? obj1 : {}; + const formatObj2 = typeof obj2 === 'object' ? obj2 : {}; + return { ...formatObj1, ...formatObj2 }; + }; - const dbModel = dbModels.find((item) => item.model === model.model); - const provider = getModelProvider(dbModel?.metadata?.provider || model.provider, language); + const dbModel = dbModels.find((item) => item.model === model.model); + const provider = getModelProvider(dbModel?.metadata?.provider || model.provider, language); - const modelData: any = { - ...model, - ...dbModel?.metadata, - provider: provider.id, - avatar: provider.avatar, - type: dbModel?.metadata?.type || model.type, - isCustom: false, + const modelData: any = { + ...model, + ...dbModel?.metadata, + provider: provider.id, + avatar: provider.avatar, + type: dbModel?.metadata?.type || model.type, + isCustom: false, - ...(model.type === ModelTypeEnum.llm && { - maxResponse: model.maxTokens || 4000 - }), + ...(model.type === ModelTypeEnum.llm && { + maxResponse: model.maxTokens || 4000 + }), - ...(model.type === ModelTypeEnum.llm && dbModel?.metadata?.type === ModelTypeEnum.llm - ? { - maxResponse: dbModel?.metadata?.maxResponse ?? model.maxTokens ?? 4000, - defaultConfig: mergeObject(model.defaultConfig, dbModel?.metadata?.defaultConfig), - fieldMap: mergeObject(model.fieldMap, dbModel?.metadata?.fieldMap), - maxTokens: undefined - } - : {}) - }; - pushModel(modelData); - }) - ); + ...(model.type === ModelTypeEnum.llm && dbModel?.metadata?.type === ModelTypeEnum.llm + ? { + maxResponse: dbModel?.metadata?.maxResponse ?? model.maxTokens ?? 4000, + defaultConfig: mergeObject(model.defaultConfig, dbModel?.metadata?.defaultConfig), + fieldMap: mergeObject(model.fieldMap, dbModel?.metadata?.fieldMap), + maxTokens: undefined + } + : {}) + }; + pushModel(modelData); + }); // Custom model(Not in system config) dbModels.forEach((dbModel) => { @@ -240,8 +238,7 @@ export const loadSystemModels = async (init = false, language = 'en') => { ); } catch (error) { console.error('Load models error', error); - // @ts-ignore - global.systemModelList = undefined; + return Promise.reject(error); } }; diff --git a/packages/service/core/app/controller.ts b/packages/service/core/app/controller.ts index e6e546cac..fb07552a2 100644 --- a/packages/service/core/app/controller.ts +++ b/packages/service/core/app/controller.ts @@ -4,12 +4,10 @@ import { FlowNodeInputTypeEnum, FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant'; -import { AppFolderTypeList } from '@fastgpt/global/core/app/constants'; import { MongoApp } from './schema'; import type { StoreNodeItemType } from '@fastgpt/global/core/workflow/type/node'; import { encryptSecretValue, storeSecretValue } from '../../common/secret/utils'; import { SystemToolSecretInputTypeEnum } from '@fastgpt/global/core/app/tool/systemTool/constants'; -import { type ClientSession } from '../../common/mongo'; import { MongoEvaluation } from './evaluation/evalSchema'; import { removeEvaluationJob } from './evaluation/mq'; import { MongoChatItem } from '../chat/chatItemSchema'; @@ -23,10 +21,12 @@ import { MongoChatSetting } from '../chat/setting/schema'; import { MongoResourcePermission } from '../../support/permission/schema'; import { PerResourceTypeEnum } from '@fastgpt/global/support/permission/constant'; import { removeImageByPath } from '../../common/file/image/controller'; -import { mongoSessionRun } from '../../common/mongo/sessionRun'; import { MongoAppLogKeys } from './logs/logkeysSchema'; import { MongoChatItemResponse } from '../chat/chatItemResponseSchema'; import { getS3ChatSource } from '../../common/s3/sources/chat'; +import { MongoAppChatLog } from './logs/chatLogsSchema'; +import { MongoAppRegistration } from '../../support/appRegistration/schema'; +import { MongoMcpKey } from '../../support/mcp/schema'; export const beforeUpdateAppFormat = ({ nodes }: { nodes?: StoreNodeItemType[] }) => { if (!nodes) return; @@ -136,112 +136,71 @@ export const getAppBasicInfoByIds = async ({ teamId, ids }: { teamId: string; id })); }; -export const onDelOneApp = async ({ - teamId, - appId, - session +export const deleteAppDataProcessor = async ({ + app, + teamId }: { + app: AppSchema; teamId: string; - appId: string; - session?: ClientSession; }) => { - const apps = await findAppAndAllChildren({ - teamId, - appId, - fields: '_id avatar' - }); + const appId = String(app._id); - const deletedAppIds = apps - .filter((app) => !AppFolderTypeList.includes(app.type)) - .map((app) => String(app._id)); - - // Remove eval job - const evalJobs = await MongoEvaluation.find( - { - appId: { $in: apps.map((app) => app._id) } - }, - '_id' - ).lean(); - await Promise.all(evalJobs.map((evalJob) => removeEvaluationJob(evalJob._id))); - - const del = async (app: AppSchema, session: ClientSession) => { - const appId = String(app._id); + // 1. 删除应用头像 + await removeImageByPath(app.avatar); + // 2. 删除聊天记录和S3文件 + await getS3ChatSource().deleteChatFilesByPrefix({ appId }); + await MongoAppChatLog.deleteMany({ teamId, appId }); + await MongoChatItemResponse.deleteMany({ appId }); + await MongoChatItem.deleteMany({ appId }); + await MongoChat.deleteMany({ appId }); + // 3. 删除应用相关数据(使用事务) + { // 删除分享链接 - await MongoOutLink.deleteMany({ - appId - }).session(session); - // Openapi - await MongoOpenApi.deleteMany({ - appId - }).session(session); - - // delete version - await MongoAppVersion.deleteMany({ - appId - }).session(session); - - await MongoChatInputGuide.deleteMany({ - appId - }).session(session); - + await MongoOutLink.deleteMany({ appId }); + // 删除 OpenAPI 配置 + await MongoOpenApi.deleteMany({ appId }); + // 删除应用版本 + await MongoAppVersion.deleteMany({ appId }); + // 删除聊天输入引导 + await MongoChatInputGuide.deleteMany({ appId }); // 删除精选应用记录 - await MongoChatFavouriteApp.deleteMany({ - teamId, - appId - }).session(session); - + await MongoChatFavouriteApp.deleteMany({ teamId, appId }); // 从快捷应用中移除对应应用 - await MongoChatSetting.updateMany( - { teamId }, - { $pull: { quickAppIds: { id: String(appId) } } } - ).session(session); - - // Del permission + await MongoChatSetting.updateMany({ teamId }, { $pull: { quickAppIds: { $in: [appId] } } }); + // 删除权限记录 await MongoResourcePermission.deleteMany({ resourceType: PerResourceTypeEnum.app, teamId, resourceId: appId - }).session(session); - - await MongoAppLogKeys.deleteMany({ - appId - }).session(session); - - // delete app - await MongoApp.deleteOne( - { - _id: appId - }, - { session } - ); - - // Delete avatar - await removeImageByPath(app.avatar, session); - }; - - // Delete chats - for await (const app of apps) { - const appId = String(app._id); - await getS3ChatSource().deleteChatFilesByPrefix({ appId }); - await MongoChatItemResponse.deleteMany({ - appId - }); - await MongoChatItem.deleteMany({ - appId - }); - await MongoChat.deleteMany({ - appId }); + // 删除日志密钥 + await MongoAppLogKeys.deleteMany({ appId }); + + // 删除应用注册记录 + await MongoAppRegistration.deleteMany({ appId }); + // 删除应用从MCP key apps数组中移除 + await MongoMcpKey.updateMany({ teamId, 'apps.appId': appId }, { $pull: { apps: { appId } } }); + + // 删除应用本身 + await MongoApp.deleteOne({ _id: appId }); } - - for await (const app of apps) { - if (session) { - await del(app, session); - } - - await mongoSessionRun((session) => del(app, session)); - } - - return deletedAppIds; +}; + +export const deleteAppsImmediate = async ({ + teamId, + appIds +}: { + teamId: string; + appIds: string[]; +}) => { + // Remove eval job + const evalJobs = await MongoEvaluation.find( + { + teamId, + appId: { $in: appIds } + }, + '_id' + ).lean(); + await Promise.all(evalJobs.map((evalJob) => removeEvaluationJob(evalJob._id))); }; diff --git a/packages/service/core/app/delete/index.ts b/packages/service/core/app/delete/index.ts new file mode 100644 index 000000000..465a17ef4 --- /dev/null +++ b/packages/service/core/app/delete/index.ts @@ -0,0 +1,41 @@ +import { getQueue, getWorker, QueueNames } from '../../../common/bullmq'; +import { appDeleteProcessor } from './processor'; + +export type AppDeleteJobData = { + teamId: string; + appId: string; +}; + +// 创建工作进程 +export const initAppDeleteWorker = () => { + return getWorker(QueueNames.appDelete, appDeleteProcessor, { + concurrency: 1, // 确保同时只有1个删除任务 + removeOnFail: { + age: 30 * 24 * 60 * 60 // 保留30天失败记录 + } + }); +}; + +// 添加删除任务 +export const addAppDeleteJob = (data: AppDeleteJobData) => { + // 创建删除队列 + const appDeleteQueue = getQueue(QueueNames.appDelete, { + defaultJobOptions: { + attempts: 10, + backoff: { + type: 'exponential', + delay: 5000 + }, + removeOnComplete: true, + removeOnFail: { age: 30 * 24 * 60 * 60 } // 保留30天失败记录 + } + }); + + const jobId = `${data.teamId}:${data.appId}`; + + // Use jobId to automatically prevent duplicate deletion tasks (BullMQ feature) + return appDeleteQueue.add('deleteapp', data, { + jobId, + delay: 1000 // Delay 1 second to ensure API response completes + }); +}; diff --git a/packages/service/core/app/delete/processor.ts b/packages/service/core/app/delete/processor.ts new file mode 100644 index 000000000..264fcd269 --- /dev/null +++ b/packages/service/core/app/delete/processor.ts @@ -0,0 +1,77 @@ +import type { Processor } from 'bullmq'; +import type { AppDeleteJobData } from './index'; +import { findAppAndAllChildren, deleteAppDataProcessor } from '../controller'; +import { addLog } from '../../../common/system/log'; +import { batchRun } from '@fastgpt/global/common/system/utils'; +import type { AppSchema } from '@fastgpt/global/core/app/type'; +import { MongoApp } from '../schema'; + +const deleteApps = async ({ teamId, apps }: { teamId: string; apps: AppSchema[] }) => { + const results = await batchRun( + apps, + async (app) => { + await deleteAppDataProcessor({ app, teamId }); + }, + 3 + ); + + return results.flat(); +}; + +export const appDeleteProcessor: Processor = async (job) => { + const { teamId, appId } = job.data; + const startTime = Date.now(); + + addLog.info(`[App Delete] Start deleting app: ${appId} for team: ${teamId}`); + + try { + // 1. 查找应用及其所有子应用 + const apps = await findAppAndAllChildren({ + teamId, + appId + }); + + if (!apps || apps.length === 0) { + addLog.warn(`[App Delete] App not found: ${appId}`); + return; + } + + // 2. 安全检查:确保所有要删除的应用都已标记为 deleteTime + const markedForDelete = await MongoApp.find( + { + _id: { $in: apps.map((app) => app._id) }, + teamId, + deleteTime: { $ne: null } + }, + { _id: 1 } + ).lean(); + + if (markedForDelete.length !== apps.length) { + addLog.warn( + `[App Delete] Safety check: ${markedForDelete.length}/${apps.length} apps marked for deletion`, + { + markedAppIds: markedForDelete.map((app) => app._id), + totalAppIds: apps.map((app) => app._id) + } + ); + } + + const childrenLen = apps.length - 1; + const appIds = apps.map((app) => app._id); + + // 3. 执行真正的删除操作(只删除已经标记为 deleteTime 的数据) + await deleteApps({ + teamId, + apps + }); + + addLog.info(`[App Delete] Successfully deleted app: ${appId} and ${childrenLen} children`, { + duration: Date.now() - startTime, + totalApps: appIds.length, + appIds + }); + } catch (error: any) { + addLog.error(`[App Delete] Failed to delete app: ${appId}`, error); + throw error; + } +}; diff --git a/packages/service/core/app/schema.ts b/packages/service/core/app/schema.ts index 2516ba945..2cf83b770 100644 --- a/packages/service/core/app/schema.ts +++ b/packages/service/core/app/schema.ts @@ -119,6 +119,12 @@ const AppSchema = new Schema( inited: Boolean, teamTags: { type: [String] + }, + + // 软删除标记字段 + deleteTime: { + type: Date, + default: null // null表示未删除,有值表示删除时间 } }, { @@ -138,5 +144,6 @@ AppSchema.index( ); // Admin count AppSchema.index({ type: 1 }); +AppSchema.index({ deleteTime: 1 }); export const MongoApp = getMongoModel(AppCollectionName, AppSchema); diff --git a/projects/app/src/pages/api/admin/initv4145.ts b/projects/app/src/pages/api/admin/initv4145.ts new file mode 100644 index 000000000..060bca0ca --- /dev/null +++ b/projects/app/src/pages/api/admin/initv4145.ts @@ -0,0 +1,67 @@ +import { NextAPI } from '@/service/middleware/entry'; +import { batchRun } from '@fastgpt/global/common/system/utils'; +import { getQueue, QueueNames } from '@fastgpt/service/common/bullmq'; +import type { S3MQJobData } from '@fastgpt/service/common/s3/mq'; +import { addLog } from '@fastgpt/service/common/system/log'; +import type { ApiRequestProps, ApiResponseType } from '@fastgpt/service/type/next'; +import { authCert } from '@fastgpt/service/support/permission/auth/common'; + +export type ResponseType = { + message: string; + retriedCount: number; + failedCount: number; +}; + +async function handler( + req: ApiRequestProps, + res: ApiResponseType +): Promise { + await authCert({ req, authRoot: true }); + const queue = getQueue(QueueNames.s3FileDelete); + + // Get all failed jobs and retry them + const failedJobs = await queue.getFailed(); + console.log(`Found ${failedJobs.length} failed jobs`); + + let retriedCount = 0; + + await batchRun( + failedJobs, + async (job) => { + addLog.debug(`Retrying job with 3 new attempts`, { retriedCount }); + try { + // Remove old job and recreate with new attempts + const jobData = job.data; + await job.remove(); + + // Add new job with 3 more attempts + await queue.add('delete-s3-files', jobData, { + attempts: 10, + removeOnFail: { + count: 10000, // 保留10000个失败任务 + age: 14 * 24 * 60 * 60 // 14 days + }, + removeOnComplete: true, + backoff: { + delay: 2000, + type: 'exponential' + } + }); + + retriedCount++; + console.log(`Retried job ${job.id} with 3 new attempts`); + } catch (error) { + console.error(`Failed to retry job ${job.id}:`, error); + } + }, + 100 + ); + + return { + message: 'Successfully retried all failed S3 delete jobs with 3 new attempts', + retriedCount, + failedCount: failedJobs.length + }; +} + +export default NextAPI(handler); diff --git a/projects/app/src/pages/api/core/app/del.ts b/projects/app/src/pages/api/core/app/del.ts index 843212b0b..b3e22ac8f 100644 --- a/projects/app/src/pages/api/core/app/del.ts +++ b/projects/app/src/pages/api/core/app/del.ts @@ -2,7 +2,11 @@ import type { NextApiRequest, NextApiResponse } from 'next'; import { authApp } from '@fastgpt/service/support/permission/app/auth'; import { NextAPI } from '@/service/middleware/entry'; import { OwnerPermissionVal } from '@fastgpt/global/support/permission/constant'; -import { onDelOneApp } from '@fastgpt/service/core/app/controller'; +import { findAppAndAllChildren } from '@fastgpt/service/core/app/controller'; +import { MongoApp } from '@fastgpt/service/core/app/schema'; +import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; +import { addAppDeleteJob } from '@fastgpt/service/core/app/delete'; +import { deleteAppsImmediate } from '@fastgpt/service/core/app/controller'; import { pushTrack } from '@fastgpt/service/common/middle/tracks/utils'; import { addAuditLog } from '@fastgpt/service/support/user/audit/util'; import { AuditEventEnum } from '@fastgpt/global/support/user/audit/constants'; @@ -23,7 +27,31 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { per: OwnerPermissionVal }); - const deletedAppIds = await onDelOneApp({ teamId, appId }); + const deleteAppsList = await findAppAndAllChildren({ + teamId, + appId + }); + + await mongoSessionRun(async (session) => { + // Mark app as deleted + await MongoApp.updateMany( + { _id: deleteAppsList.map((app) => app._id), teamId }, + { deleteTime: new Date() }, + { session } + ); + + // Stop background tasks immediately + await deleteAppsImmediate({ + teamId, + appIds: deleteAppsList.map((app) => app._id) + }); + + // Add to delete queue for async cleanup + await addAppDeleteJob({ + teamId, + appId + }); + }); (async () => { addAuditLog({ @@ -40,7 +68,9 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { // Tracks pushTrack.countAppNodes({ teamId, tmbId, uid: userId, appId }); - return deletedAppIds; + return deleteAppsList + .filter((app) => !['folder'].includes(app.type)) + .map((app) => String(app._id)); } export default NextAPI(handler); diff --git a/projects/app/src/pages/api/core/app/list.ts b/projects/app/src/pages/api/core/app/list.ts index f700be855..949b86f63 100644 --- a/projects/app/src/pages/api/core/app/list.ts +++ b/projects/app/src/pages/api/core/app/list.ts @@ -11,7 +11,6 @@ import { type ApiRequestProps } from '@fastgpt/service/type/next'; import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type'; import { parseParentIdInMongo } from '@fastgpt/global/common/parentFolder/utils'; import { AppFolderTypeList, AppTypeEnum } from '@fastgpt/global/core/app/constants'; -import { AppDefaultRoleVal } from '@fastgpt/global/support/permission/app/constant'; import { authApp } from '@fastgpt/service/support/permission/app/auth'; import { authUserPer } from '@fastgpt/service/support/permission/user/auth'; import { replaceRegChars } from '@fastgpt/global/common/string/tools'; @@ -97,7 +96,7 @@ async function handler(req: ApiRequestProps): Promise { if (getRecentlyChat) { return { - // get all chat app, excluding hidden apps + // get all chat app, excluding hidden apps and deleted apps teamId, type: { $in: [AppTypeEnum.workflow, AppTypeEnum.simple, AppTypeEnum.workflowTool] } }; @@ -160,7 +159,7 @@ async function handler(req: ApiRequestProps): Promise { addLog.info('Init BullMQ Workers...'); initS3MQWorker(); initDatasetDeleteWorker(); + initAppDeleteWorker(); }; diff --git a/projects/app/test/api/core/app/delete.test.ts b/projects/app/test/api/core/app/delete.test.ts new file mode 100644 index 000000000..d5a44b411 --- /dev/null +++ b/projects/app/test/api/core/app/delete.test.ts @@ -0,0 +1,582 @@ +import { describe, expect, it, beforeEach, vi, afterEach } from 'vitest'; +import type { AppDeleteJobData } from '@fastgpt/service/core/app/delete'; +import { addAppDeleteJob } from '@fastgpt/service/core/app/delete'; +import { appDeleteProcessor } from '@fastgpt/service/core/app/delete/processor'; +import handler from '@/pages/api/core/app/del'; +import { MongoApp } from '@fastgpt/service/core/app/schema'; +import { AppTypeEnum } from '@fastgpt/global/core/app/constants'; +import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; +import { getRootUser } from '@test/datas/users'; +import { Call } from '@test/utils/request'; +import { MongoChat } from '@fastgpt/service/core/chat/chatSchema'; +import { MongoChatItem } from '@fastgpt/service/core/chat/chatItemSchema'; +import { MongoChatItemResponse } from '@fastgpt/service/core/chat/chatItemResponseSchema'; +import { MongoOutLink } from '@fastgpt/service/support/outLink/schema'; +import { MongoOpenApi } from '@fastgpt/service/support/openapi/schema'; +import { MongoAppVersion } from '@fastgpt/service/core/app/version/schema'; +import { MongoChatInputGuide } from '@fastgpt/service/core/chat/inputGuide/schema'; +import { MongoChatFavouriteApp } from '@fastgpt/service/core/chat/favouriteApp/schema'; +import { MongoChatSetting } from '@fastgpt/service/core/chat/setting/schema'; +import { MongoResourcePermission } from '@fastgpt/service/support/permission/schema'; +import { PerResourceTypeEnum } from '@fastgpt/global/support/permission/constant'; +import { MongoAppLogKeys } from '@fastgpt/service/core/app/logs/logkeysSchema'; +import { FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant'; +import { ChatSourceEnum } from '@fastgpt/global/core/chat/constants'; + +// Mock dependencies for queue functionality +vi.mock('@fastgpt/service/common/bullmq', () => ({ + getQueue: vi.fn(), + getWorker: vi.fn(), + QueueNames: { + appDelete: 'app-delete' + } +})); + +// Mock S3 and image removal functions +vi.mock('@fastgpt/service/common/s3/sources/chat', () => ({ + getS3ChatSource: () => ({ + deleteChatFilesByPrefix: vi.fn().mockResolvedValue(undefined) + }) +})); + +vi.mock('@fastgpt/service/common/file/image/controller', () => ({ + removeImageByPath: vi.fn().mockResolvedValue(undefined) +})); + +// Import mocked modules for type access +import { getQueue, getWorker, QueueNames } from '@fastgpt/service/common/bullmq'; + +const mockGetQueue = vi.mocked(getQueue); +const mockGetWorker = vi.mocked(getWorker); + +describe('App Delete Queue', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('addAppDeleteJob', () => { + it('should add job to queue with correct parameters', async () => { + const mockQueue = { + add: vi.fn().mockResolvedValue({ id: 'job-123' }) + }; + mockGetQueue.mockReturnValue(mockQueue as any); + + const jobData: AppDeleteJobData = { + teamId: 'team-123', + appId: 'app-123' + }; + + const result = await addAppDeleteJob(jobData); + + expect(mockGetQueue).toHaveBeenCalledWith(QueueNames.appDelete, { + defaultJobOptions: { + attempts: 10, + backoff: { + type: 'exponential', + delay: 5000 + }, + removeOnComplete: true, + removeOnFail: { age: 30 * 24 * 60 * 60 } + } + }); + + expect(mockQueue.add).toHaveBeenCalledWith('deleteapp', jobData, { + jobId: 'team-123:app-123', + delay: 1000 + }); + + expect(result).toEqual({ id: 'job-123' }); + }); + + it('should use correct jobId format for preventing duplicates', async () => { + const mockQueue = { + add: vi.fn().mockResolvedValue({ id: 'job-456' }) + }; + mockGetQueue.mockReturnValue(mockQueue as any); + + const jobData: AppDeleteJobData = { + teamId: 'team-xyz', + appId: 'app-abc' + }; + + await addAppDeleteJob(jobData); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'deleteapp', + jobData, + expect.objectContaining({ + jobId: 'team-xyz:app-abc' + }) + ); + }); + }); +}); + +describe('App Delete API Integration', () => { + let rootUser: any; + + beforeEach(async () => { + // Get root user for testing + rootUser = await getRootUser(); + }); + + it('should successfully delete an app and mark it for deletion', async () => { + // Create a test app first + const testApp = await MongoApp.create({ + name: 'Test App for Deletion', + teamId: rootUser.teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + modules: [] + }); + + // Mock the queue to avoid actual background deletion + const mockQueue = { + add: vi.fn().mockResolvedValue({ id: 'job-123' }) + }; + mockGetQueue.mockReturnValue(mockQueue as any); + + // Call the delete API + const result = await Call(handler, { + auth: rootUser, + query: { appId: String(testApp._id) } + }); + + expect(result.code).toBe(200); + expect(Array.isArray(result.data)).toBe(true); + + // Verify the app is marked for deletion + const deletedApp = await MongoApp.findOne({ _id: testApp._id }); + expect(deletedApp?.deleteTime).not.toBeNull(); + + // Verify queue job was added + expect(mockQueue.add).toHaveBeenCalledWith( + 'deleteapp', + { + teamId: rootUser.teamId, + appId: String(testApp._id) + }, + { + jobId: `${rootUser.teamId}:${testApp._id}`, + delay: 1000 + } + ); + + // Cleanup + await MongoApp.deleteOne({ _id: testApp._id }); + }); + + it('should handle folder deletion correctly', async () => { + // Create a folder + const testFolder = await MongoApp.create({ + name: 'Test Folder', + teamId: rootUser.teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.folder, + parentId: null + }); + + // Create a child app in the folder + const childApp = await MongoApp.create({ + name: 'Child App', + teamId: rootUser.teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + parentId: testFolder._id, + modules: [] + }); + + // Mock the queue + const mockQueue = { + add: vi.fn().mockResolvedValue({ id: 'job-folder' }) + }; + mockGetQueue.mockReturnValue(mockQueue as any); + + // Call the delete API + const result = await Call(handler, { + auth: rootUser, + query: { appId: String(testFolder._id) } + }); + + expect(result.code).toBe(200); + expect(Array.isArray(result.data)).toBe(true); + + // Folders should not be included in the response + const deletedAppIds = result.data as string[]; + expect(deletedAppIds).not.toContain(String(testFolder._id)); + expect(deletedAppIds).toContain(String(childApp._id)); + + // Cleanup + await mongoSessionRun(async (session) => { + await MongoApp.deleteOne({ _id: testFolder._id }, { session }); + await MongoApp.deleteOne({ _id: childApp._id }, { session }); + }); + }); + + it('should handle non-existent app gracefully', async () => { + const nonExistentId = '507f1f77bcf86cd799439011'; + + const result = await Call(handler, { + auth: rootUser, + query: { appId: nonExistentId } + }); + + expect(result.code).toBe(500); + expect(result.error).toBe('appUnExist'); + }); +}); + +describe('App Delete Data Cleanup Verification', () => { + let rootUser: any; + let testApp: any; + let teamId: string; + let appId: string; + + beforeEach(async () => { + rootUser = await getRootUser(); + teamId = rootUser.teamId; + + // 创建测试应用 + testApp = await MongoApp.create({ + name: 'Test App for Full Deletion', + teamId: teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + modules: [ + { + flowPosition: { x: 100, y: 100 }, + inputs: [], + outputs: [], + avatar: '/test/avatar.png', + name: 'Test Module', + intro: 'Test module intro', + flowType: FlowNodeTypeEnum.chatNode, + version: '1.0' + } + ], + avatar: '/test/app-avatar.png' + }); + appId = String(testApp._id); + + // 创建相关测试数据 + await createAllRelatedTestData(appId, teamId); + }); + + afterEach(async () => { + // 清理测试数据,确保测试环境干净 + await cleanupTestData(appId, teamId); + }); + + describe('Complete Data Deletion Verification', () => { + it('should delete ALL related data when app deletion queue job is processed', async () => { + // 1. 验证测试数据创建成功 + await verifyTestDataExists(appId, teamId); + + // 2. 标记应用为删除状态(模拟 API 调用后的状态) + await MongoApp.updateOne({ _id: appId }, { deleteTime: new Date() }); + + // 3. 执行删除处理器(模拟队列任务执行) + const mockJob = { + data: { teamId, appId }, + id: 'test-job-id' + }; + + await appDeleteProcessor(mockJob); + + // 4. 验证所有相关数据都被删除 + await verifyAllDataDeleted(appId, teamId); + + // 5. 验证应用本身被删除 + const deletedApp = await MongoApp.findOne({ _id: appId }); + expect(deletedApp).toBeNull(); + }); + + it('should handle deletion of nested apps and their data', async () => { + // 创建父子应用结构 + const parentApp = await MongoApp.create({ + name: 'Parent App', + teamId: teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + modules: [] + }); + + const childApp = await MongoApp.create({ + name: 'Child App', + teamId: teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + parentId: parentApp._id, + modules: [] + }); + + // 为子应用创建相关数据 + await createAllRelatedTestData(String(childApp._id), teamId); + + // 标记父应用为删除状态 + await MongoApp.updateOne({ _id: parentApp._id }, { deleteTime: new Date() }); + + // 执行删除(应该级联删除子应用) + const mockJob = { + data: { teamId, appId: String(parentApp._id) }, + id: 'test-nested-job' + }; + + await appDeleteProcessor(mockJob); + + // 验证父应用和子应用都被删除 + expect(await MongoApp.countDocuments({ _id: parentApp._id })).toBe(0); + expect(await MongoApp.countDocuments({ _id: childApp._id })).toBe(0); + + // 验证子应用的相关数据也被删除 + await verifyAllDataDeleted(String(childApp._id), teamId); + + // 清理 + await MongoApp.deleteMany({ + _id: { $in: [parentApp._id, childApp._id] }, + teamId + }); + }); + + it('should handle batch deletion of multiple apps', async () => { + // 创建多个应用 + const app2 = await MongoApp.create({ + name: 'Test App 2', + teamId: teamId, + tmbId: rootUser.tmbId, + type: AppTypeEnum.simple, + modules: [] + }); + + const app2Id = String(app2._id); + await createAllRelatedTestData(app2Id, teamId); + + // 标记两个应用为删除状态 + await MongoApp.updateMany({ _id: { $in: [appId, app2Id] } }, { deleteTime: new Date() }); + + // 删除第一个应用 + const mockJob1 = { + data: { teamId, appId }, + id: 'test-batch-job-1' + }; + + await appDeleteProcessor(mockJob1); + + // 验证第一个应用的数据被删除 + await verifyAllDataDeleted(appId, teamId); + expect(await MongoApp.countDocuments({ _id: appId })).toBe(0); + + // 第二个应用的数据应该仍然存在 + await verifyTestDataExists(app2Id, teamId); + + // 清理 + await cleanupTestData(app2Id, teamId); + await MongoApp.deleteOne({ _id: app2Id }); + }); + }); + + // 辅助函数:创建所有相关测试数据 + async function createAllRelatedTestData(appId: string, teamId: string) { + const timestamp = Date.now(); + + // 1. 创建聊天记录 + await MongoChat.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + chatId: `test-chat-${timestamp}`, + title: 'Test Chat', + source: ChatSourceEnum.test, + customTitle: false, + variables: [], + status: 'finish' + }); + + // 2. 创建聊天项 + await MongoChatItem.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + chatId: `test-chat-${timestamp}`, + time: timestamp, + obj: 'Human', + value: 'Hello, this is a test message', + userBadFeedback: null, + adminBadFeedback: null + }); + + // 3. 创建聊天项响应 + await MongoChatItemResponse.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + chatItemId: `test-chat-item-${timestamp}`, + time: timestamp, + text: 'This is a test response', + q: 'Test question', + a: 'Test answer', + responseData: [] + }); + + // 4. 创建分享链接 + await MongoOutLink.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + name: 'Test Share Link', + shareId: `test_share_${timestamp}`, + type: 'share', + limit: 100, + immediateReturn: false + }); + + // 5. 创建 OpenAPI 配置 + await MongoOpenApi.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + apiKey: `test_api_${timestamp}`, + limit: 1000 + }); + + // 6. 创建应用版本 + await MongoAppVersion.create({ + appId: appId, + teamId: teamId, + tmbId: rootUser.tmbId, + version: '1.0.0', + nodes: [], + edges: [] + }); + + // 7. 创建聊天输入引导 + await MongoChatInputGuide.create({ + appId: appId, + text: 'Test input guide', + userKey: 'test_user', + createTime: timestamp + }); + + // 8. 创建精选应用记录 + await MongoChatFavouriteApp.create({ + teamId: teamId, + tmbId: rootUser.tmbId, + appId: appId, + name: 'Test Favourite App' + }); + + // 9. 创建快捷应用设置(包含此应用) + await MongoChatSetting.findOneAndUpdate( + { teamId: teamId }, + { + teamId: teamId, + tmbId: rootUser.tmbId, + quickAppIds: [appId] + }, + { upsert: true } + ); + + // 10. 创建权限记录 + await MongoResourcePermission.create({ + resourceType: PerResourceTypeEnum.app, + teamId: teamId, + tmbId: rootUser.tmbId, + resourceId: appId, + permission: 100 // 100 = write permission + }); + + // 11. 创建日志密钥 + await MongoAppLogKeys.create({ + appId: appId, + teamId: teamId, + key: `test_key_${timestamp}`, + createTime: timestamp + }); + } + + // 辅助函数:验证测试数据存在 + async function verifyTestDataExists(appId: string, teamId: string) { + expect(await MongoChat.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoChatItem.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoChatItemResponse.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoOutLink.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoOpenApi.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoAppVersion.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoChatInputGuide.countDocuments({ appId })).toBeGreaterThan(0); + expect(await MongoChatFavouriteApp.countDocuments({ teamId, appId })).toBeGreaterThan(0); + + const chatSettings = await MongoChatSetting.findOne({ teamId }); + expect(chatSettings?.quickAppIds).toContain(appId); + + expect( + await MongoResourcePermission.countDocuments({ + resourceType: PerResourceTypeEnum.app, + teamId, + resourceId: appId + }) + ).toBeGreaterThan(0); + + expect(await MongoAppLogKeys.countDocuments({ appId })).toBeGreaterThan(0); + } + + // 辅助函数:验证所有数据被删除 + async function verifyAllDataDeleted(appId: string, teamId: string) { + // 验证聊天相关数据被删除 + expect(await MongoChat.countDocuments({ appId })).toBe(0); + expect(await MongoChatItem.countDocuments({ appId })).toBe(0); + expect(await MongoChatItemResponse.countDocuments({ appId })).toBe(0); + + // 验证应用配置数据被删除 + expect(await MongoOutLink.countDocuments({ appId })).toBe(0); + expect(await MongoOpenApi.countDocuments({ appId })).toBe(0); + expect(await MongoAppVersion.countDocuments({ appId })).toBe(0); + expect(await MongoChatInputGuide.countDocuments({ appId })).toBe(0); + + // 验证用户相关数据被删除 + expect(await MongoChatFavouriteApp.countDocuments({ teamId, appId })).toBe(0); + + // 验证快捷应用设置被更新 + const chatSettings = await MongoChatSetting.findOne({ teamId }); + expect(chatSettings?.quickAppIds).not.toContain(appId); + + // 验证权限数据被删除 + expect( + await MongoResourcePermission.countDocuments({ + resourceType: PerResourceTypeEnum.app, + teamId, + resourceId: appId + }) + ).toBe(0); + + // 验证日志密钥被删除 + expect(await MongoAppLogKeys.countDocuments({ appId })).toBe(0); + } + + // 辅助函数:清理测试数据 + async function cleanupTestData(appId: string, teamId: string) { + await mongoSessionRun(async (session) => { + await MongoChat.deleteMany({ appId }, { session }); + await MongoChatItem.deleteMany({ appId }, { session }); + await MongoChatItemResponse.deleteMany({ appId }, { session }); + await MongoOutLink.deleteMany({ appId }, { session }); + await MongoOpenApi.deleteMany({ appId }, { session }); + await MongoAppVersion.deleteMany({ appId }, { session }); + await MongoChatInputGuide.deleteMany({ appId }, { session }); + await MongoChatFavouriteApp.deleteMany({ teamId, appId }, { session }); + await MongoResourcePermission.deleteMany( + { + resourceType: PerResourceTypeEnum.app, + teamId, + resourceId: appId + }, + { session } + ); + await MongoAppLogKeys.deleteMany({ appId }, { session }); + await MongoChatSetting.deleteMany({ teamId }, { session }); + }); + } +}); diff --git a/projects/app/test/api/core/chat/feedback/closeCustom.test.ts b/projects/app/test/api/core/chat/feedback/closeCustom.test.ts index b730c5730..a8a8d3204 100644 --- a/projects/app/test/api/core/chat/feedback/closeCustom.test.ts +++ b/projects/app/test/api/core/chat/feedback/closeCustom.test.ts @@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe('closeCustom api test', () => { +describe.sequential('closeCustom api test', () => { let testUser: Awaited>; let appId: string; let chatId: string; diff --git a/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts b/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts index ceac2d206..ddebacf9b 100644 --- a/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts +++ b/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts @@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe('updateFeedbackReadStatus api test', () => { +describe.sequential('updateFeedbackReadStatus api test', () => { let testUser: Awaited>; let appId: string; let chatId: string; diff --git a/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts b/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts index 0b8cf98ad..8163db530 100644 --- a/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts +++ b/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts @@ -14,7 +14,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe('updateUserFeedback api test', () => { +describe.sequential('updateUserFeedback api test', () => { let testUser: Awaited>; let appId: string; let chatId: string;