perf: remove dataset code (#6132)
Some checks are pending
Document deploy / sync-images (push) Waiting to run
Document deploy / generate-timestamp (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions

* stop design doc

* perf: init worker

* perf: remove dataset cide

* remove invalid doc
This commit is contained in:
Archer 2025-12-21 20:56:50 +08:00 committed by GitHub
parent 2fea73bb68
commit b0a48603f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 48 additions and 47 deletions

View File

@ -31,10 +31,10 @@ export const addAppDeleteJob = (data: AppDeleteJobData) => {
} }
}); });
const jobId = `${data.teamId}:${data.appId}`; const jobId = `${String(data.teamId)}:${String(data.appId)}`;
// Use jobId to automatically prevent duplicate deletion tasks (BullMQ feature) // Use jobId to automatically prevent duplicate deletion tasks (BullMQ feature)
return appDeleteQueue.add('deleteapp', data, { return appDeleteQueue.add('delete_app', data, {
jobId, jobId,
delay: 1000 // Delay 1 second to ensure API response completes delay: 1000 // Delay 1 second to ensure API response completes
}); });

View File

@ -68,7 +68,7 @@ export async function delDatasetRelevantData({
datasets, datasets,
session session
}: { }: {
datasets: DatasetSchemaType[]; datasets: { _id: string; teamId: string }[];
session: ClientSession; session: ClientSession;
}) { }) {
if (!datasets.length) return; if (!datasets.length) return;
@ -115,24 +115,6 @@ export async function delDatasetRelevantData({
// Delete vector data // Delete vector data
await deleteDatasetDataVector({ teamId, datasetIds }); await deleteDatasetDataVector({ teamId, datasetIds });
for (const datasetId of datasetIds) {
// Delete dataset_data_texts in batches by datasetId
await MongoDatasetDataText.deleteMany({
teamId,
datasetId
}).maxTimeMS(300000); // Reduce timeout for single batch
// Delete dataset_datas in batches by datasetId
await MongoDatasetData.deleteMany({
teamId,
datasetId
}).maxTimeMS(300000);
}
// Delete source: 兼容旧版的图片
await delCollectionRelatedSource({ collections });
// Delete vector data
await deleteDatasetDataVector({ teamId, datasetIds });
// delete collections // delete collections
await MongoDatasetCollection.deleteMany({ await MongoDatasetCollection.deleteMany({
teamId, teamId,

View File

@ -31,11 +31,11 @@ export const addDatasetDeleteJob = (data: DatasetDeleteJobData) => {
} }
}); });
const jobId = `${data.teamId}:${data.datasetId}`; const jobId = `${String(data.teamId)}:${String(data.datasetId)}`;
// 使用去重机制,避免重复删除 // 使用去重机制,避免重复删除
return datasetDeleteQueue.add(jobId, data, { return datasetDeleteQueue.add('delete_dataset', data, {
deduplication: { id: jobId }, jobId,
delay: 1000 // 延迟1秒执行确保API响应完成 delay: 1000 // 延迟1秒执行确保API响应完成
}); });
}; };

View File

@ -1,8 +1,7 @@
import type { Processor } from 'bullmq'; import type { Processor } from 'bullmq';
import type { DatasetDeleteJobData } from './index'; import { addDatasetDeleteJob, type DatasetDeleteJobData } from './index';
import { delDatasetRelevantData, findDatasetAndAllChildren } from '../controller'; import { delDatasetRelevantData, findDatasetAndAllChildren } from '../controller';
import { addLog } from '../../../common/system/log'; import { addLog } from '../../../common/system/log';
import type { DatasetSchemaType } from '@fastgpt/global/core/dataset/type';
import { MongoDatasetCollectionTags } from '../tag/schema'; import { MongoDatasetCollectionTags } from '../tag/schema';
import { removeDatasetSyncJobScheduler } from '../datasetSync'; import { removeDatasetSyncJobScheduler } from '../datasetSync';
import { mongoSessionRun } from '../../../common/mongo/sessionRun'; import { mongoSessionRun } from '../../../common/mongo/sessionRun';
@ -12,36 +11,53 @@ import { MongoDatasetTraining } from '../training/schema';
export const deleteDatasetsImmediate = async ({ export const deleteDatasetsImmediate = async ({
teamId, teamId,
datasets datasetIds
}: { }: {
teamId: string; teamId: string;
datasets: DatasetSchemaType[]; datasetIds: string[];
}) => { }) => {
const datasetIds = datasets.map((d) => d._id);
// delete training data // delete training data
MongoDatasetTraining.deleteMany({ await MongoDatasetTraining.deleteMany({
teamId, teamId,
datasetId: { $in: datasetIds } datasetId: { $in: datasetIds }
}); });
// Remove cron job // Remove cron job
await Promise.all(
datasetIds.map((id) => {
return removeDatasetSyncJobScheduler(id);
})
);
};
// Clear a team datasets
export const deleteTeamAllDatasets = async (teamId: string) => {
const datasets = await MongoDataset.find(
{
teamId
},
{ _id: 1, parentId: 1 }
);
await deleteDatasetsImmediate({
teamId,
datasetIds: datasets.map((d) => d._id)
});
await Promise.all( await Promise.all(
datasets.map((dataset) => { datasets.map((dataset) => {
// 只处理已标记删除的数据集 if (dataset.parentId) return;
if (datasetIds.includes(dataset._id)) { return addDatasetDeleteJob({
return removeDatasetSyncJobScheduler(dataset._id); teamId,
} datasetId: dataset._id
});
}) })
); );
}; };
export const deleteDatasets = async ({ const deleteDatasets = async ({
teamId, teamId,
datasets datasets
}: { }: {
teamId: string; teamId: string;
datasets: DatasetSchemaType[]; datasets: { _id: string; avatar: string; teamId: string }[];
}) => { }) => {
const datasetIds = datasets.map((d) => d._id); const datasetIds = datasets.map((d) => d._id);
@ -81,7 +97,8 @@ export const datasetDeleteProcessor: Processor<DatasetDeleteJobData> = async (jo
// 1. 查找知识库及其所有子知识库 // 1. 查找知识库及其所有子知识库
const datasets = await findDatasetAndAllChildren({ const datasets = await findDatasetAndAllChildren({
teamId, teamId,
datasetId datasetId,
fields: '_id teamId avatar'
}); });
if (!datasets || datasets.length === 0) { if (!datasets || datasets.length === 0) {

View File

@ -29,6 +29,8 @@ const PromotionRecordSchema = new Schema({
} }
}); });
PromotionRecordSchema.index({ userId: 1 });
export const MongoPromotionRecord = getMongoModel<PromotionRecordType>( export const MongoPromotionRecord = getMongoModel<PromotionRecordType>(
'promotionRecord', 'promotionRecord',
PromotionRecordSchema PromotionRecordSchema

View File

@ -32,14 +32,16 @@ async function handler(req: NextApiRequest) {
const deleteDatasets = await findDatasetAndAllChildren({ const deleteDatasets = await findDatasetAndAllChildren({
teamId, teamId,
datasetId datasetId,
fields: '_id'
}); });
const datasetIds = deleteDatasets.map((d) => d._id);
await mongoSessionRun(async (session) => { await mongoSessionRun(async (session) => {
// 1. Mark as deleted // 1. Mark as deleted
await MongoDataset.updateMany( await MongoDataset.updateMany(
{ {
_id: deleteDatasets.map((d) => d._id), _id: datasetIds,
teamId teamId
}, },
{ {
@ -52,7 +54,7 @@ async function handler(req: NextApiRequest) {
await deleteDatasetsImmediate({ await deleteDatasetsImmediate({
teamId, teamId,
datasets: deleteDatasets datasetIds
}); });
// 2. Add to delete queue // 2. Add to delete queue

View File

@ -5,7 +5,5 @@ import { initAppDeleteWorker } from '@fastgpt/service/core/app/delete';
export const initBullMQWorkers = () => { export const initBullMQWorkers = () => {
addLog.info('Init BullMQ Workers...'); addLog.info('Init BullMQ Workers...');
initS3MQWorker(); return Promise.all([initS3MQWorker(), initDatasetDeleteWorker(), initAppDeleteWorker()]);
initDatasetDeleteWorker();
initAppDeleteWorker();
}; };

View File

@ -84,7 +84,7 @@ describe('App Delete Queue', () => {
} }
}); });
expect(mockQueue.add).toHaveBeenCalledWith('deleteapp', jobData, { expect(mockQueue.add).toHaveBeenCalledWith('delete_app', jobData, {
jobId: 'team-123:app-123', jobId: 'team-123:app-123',
delay: 1000 delay: 1000
}); });
@ -106,7 +106,7 @@ describe('App Delete Queue', () => {
await addAppDeleteJob(jobData); await addAppDeleteJob(jobData);
expect(mockQueue.add).toHaveBeenCalledWith( expect(mockQueue.add).toHaveBeenCalledWith(
'deleteapp', 'delete_app',
jobData, jobData,
expect.objectContaining({ expect.objectContaining({
jobId: 'team-xyz:app-abc' jobId: 'team-xyz:app-abc'
@ -155,7 +155,7 @@ describe('App Delete API Integration', () => {
// Verify queue job was added // Verify queue job was added
expect(mockQueue.add).toHaveBeenCalledWith( expect(mockQueue.add).toHaveBeenCalledWith(
'deleteapp', 'delete_app',
{ {
teamId: rootUser.teamId, teamId: rootUser.teamId,
appId: String(testApp._id) appId: String(testApp._id)