mirror of
https://github.com/labring/FastGPT.git
synced 2025-12-25 20:02:47 +00:00
V4.14.4 dev (#6058)
Some checks are pending
Document deploy / sync-images (push) Waiting to run
Document deploy / generate-timestamp (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions
Some checks are pending
Document deploy / sync-images (push) Waiting to run
Document deploy / generate-timestamp (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions
* perf: faq * index * delete dataset * delete dataset * perf: delete dataset * init * fix: outLink UID (#6048) * perf: query extension * fix: s3 configs (#6050) * fix: s3 configs * s3 --------- Co-authored-by: archer <545436317@qq.com> * s3 valid string check * perf: completion api * fix: model test * perf: init * fix: init * fix: init shell * fix: faq --------- Co-authored-by: Roy <whoeverimf5@gmail.com>
This commit is contained in:
parent
44f95038b0
commit
bdee2db74a
|
|
@ -15,20 +15,20 @@ description: FastGPT 分享链接身份鉴权
|
|||
|
||||
### 接口统一响应格式
|
||||
|
||||
```json
|
||||
```jsonc
|
||||
{
|
||||
"success": true,
|
||||
"message": "错误提示",
|
||||
"msg": "同message, 错误提示",
|
||||
"data": {
|
||||
"uid": "用户唯一凭证"
|
||||
"uid": "用户唯一凭证" // 必须返回
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`FastGPT` 将会判断`success`是否为`true`决定是允许用户继续操作。`message`与`msg`是等同的,你可以选择返回其中一个,当`success`不为`true`时,将会提示这个错误。
|
||||
|
||||
`uid`是用户的唯一凭证,将会用于拉取对话记录以及保存对话记录。可参考下方实践案例。
|
||||
`uid` 是用户的唯一凭证,必须返回该 ID 且 ID 的格式为不包含 "|"、"/“、"\" 字符的、长度小于等于 255 的字符串,否则会返回 `Invalid UID` 的错误。`uid` 将会用于拉取对话记录以及保存对话记录,可参考下方实践案例。
|
||||
|
||||
### 触发流程
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ curl --location --request POST 'https://{{host}}/api/admin/initv4144' \
|
|||
4. 通过 API 上传本地文件至知识库,保存至 S3。同时将旧版 Gridfs 代码全部移除。
|
||||
5. 新版订阅套餐逻辑。
|
||||
6. 支持配置对话文件白名单。
|
||||
7. S3 支持 pathStyle 配置。
|
||||
|
||||
## ⚙️ 优化
|
||||
|
||||
|
|
@ -38,6 +39,8 @@ curl --location --request POST 'https://{{host}}/api/admin/initv4144' \
|
|||
3. 用户通知,支持中英文,以及优化模板。
|
||||
4. 删除知识库采用队列异步删除模式。
|
||||
5. LLM 请求时,图片无效报错提示。
|
||||
6. completions 接口,非 stream 模式, detail=false 时,增加返回 reason_content。
|
||||
7. 增加对于无效的 S3 key 检测。
|
||||
|
||||
## 🐛 修复
|
||||
|
||||
|
|
@ -47,6 +50,9 @@ curl --location --request POST 'https://{{host}}/api/admin/initv4144' \
|
|||
4. 工作台卡片在名字过长时错位。
|
||||
5. 分享链接中url query 中携带全局变量时,前端 UI 不会加载该值。
|
||||
6. window 下判断 CSV 文件异常。
|
||||
7. 模型测试时,如果模型未启动,会导致无法被测试。
|
||||
8. MCP header 中带特殊内容时,会抛错。
|
||||
9. 工作流引用其他 Agent 时,切换版本号后未及时更新 UI。
|
||||
|
||||
## 插件
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@
|
|||
"document/content/docs/introduction/development/openapi/chat.mdx": "2025-11-14T13:21:17+08:00",
|
||||
"document/content/docs/introduction/development/openapi/dataset.mdx": "2025-09-29T11:34:11+08:00",
|
||||
"document/content/docs/introduction/development/openapi/intro.mdx": "2025-09-29T11:34:11+08:00",
|
||||
"document/content/docs/introduction/development/openapi/share.mdx": "2025-08-05T23:20:39+08:00",
|
||||
"document/content/docs/introduction/development/openapi/share.mdx": "2025-12-08T16:10:51+08:00",
|
||||
"document/content/docs/introduction/development/proxy/cloudflare.mdx": "2025-07-23T21:35:03+08:00",
|
||||
"document/content/docs/introduction/development/proxy/http_proxy.mdx": "2025-07-23T21:35:03+08:00",
|
||||
"document/content/docs/introduction/development/proxy/nginx.mdx": "2025-07-23T21:35:03+08:00",
|
||||
|
|
@ -118,7 +118,7 @@
|
|||
"document/content/docs/upgrading/4-14/4141.mdx": "2025-11-19T10:15:27+08:00",
|
||||
"document/content/docs/upgrading/4-14/4142.mdx": "2025-11-18T19:27:14+08:00",
|
||||
"document/content/docs/upgrading/4-14/4143.mdx": "2025-11-26T20:52:05+08:00",
|
||||
"document/content/docs/upgrading/4-14/4144.mdx": "2025-12-08T01:44:15+08:00",
|
||||
"document/content/docs/upgrading/4-14/4144.mdx": "2025-12-08T17:57:59+08:00",
|
||||
"document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00",
|
||||
"document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00",
|
||||
"document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00",
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ export class S3BaseBucket {
|
|||
port: externalPort,
|
||||
accessKey: options.accessKey,
|
||||
secretKey: options.secretKey,
|
||||
pathStyle: options.pathStyle,
|
||||
transportAgent: options.transportAgent
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,3 +52,9 @@ export const getSystemMaxFileSize = () => {
|
|||
const config = global.feConfigs?.uploadFileMaxSize || 1024; // MB, default 1024MB
|
||||
return config; // bytes
|
||||
};
|
||||
|
||||
export const S3_KEY_PATH_INVALID_CHARS_MAP: Record<string, boolean> = {
|
||||
'/': true,
|
||||
'\\': true,
|
||||
'|': true
|
||||
};
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { setCron } from '../system/cron';
|
|||
import { checkTimerLock } from '../system/timerLock/utils';
|
||||
import { TimerIdEnum } from '../system/timerLock/constants';
|
||||
import path from 'node:path';
|
||||
import { S3Error } from 'minio';
|
||||
|
||||
export async function clearExpiredMinioFiles() {
|
||||
try {
|
||||
|
|
@ -56,6 +57,12 @@ export async function clearExpiredMinioFiles() {
|
|||
addLog.warn(`Bucket not found: ${file.bucketName}`);
|
||||
}
|
||||
} catch (error) {
|
||||
if (
|
||||
error instanceof S3Error &&
|
||||
error.message.includes('Object name contains unsupported characters.')
|
||||
) {
|
||||
await MongoS3TTL.deleteOne({ _id: file._id });
|
||||
}
|
||||
fail++;
|
||||
addLog.error(`Failed to delete minio file: ${file.minioKey}`, error);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import { useTextCosine } from '../hooks/useTextCosine';
|
|||
This module can eliminate referential ambiguity and expand queries based on context to improve retrieval.
|
||||
Submodular Optimization Mode: Generate multiple candidate queries, then use submodular algorithm to select the optimal query combination
|
||||
*/
|
||||
const title = global.feConfigs?.systemTitle || 'FastAI';
|
||||
const title = global.feConfigs?.systemTitle || 'Nginx';
|
||||
const defaultPrompt = `## 你的任务
|
||||
你作为一个向量检索助手,你的任务是结合历史记录,为"原问题"生成{{count}}个不同版本的"检索词"。这些检索词应该从不同角度探索主题,以提高向量检索的语义丰富度和精度。
|
||||
|
||||
|
|
@ -230,7 +230,7 @@ assistant: ${chatBg}
|
|||
.replace(/ /g, '');
|
||||
|
||||
try {
|
||||
const queries = json5.parse(jsonStr) as string[];
|
||||
let queries = json5.parse(jsonStr) as string[];
|
||||
|
||||
if (!Array.isArray(queries) || queries.length === 0) {
|
||||
return {
|
||||
|
|
@ -248,6 +248,8 @@ assistant: ${chatBg}
|
|||
const { lazyGreedyQuerySelection, embeddingModel: useEmbeddingModel } = useTextCosine({
|
||||
embeddingModel
|
||||
});
|
||||
queries = queries.map((item) => String(item));
|
||||
|
||||
const { selectedData: selectedQueries, embeddingTokens } = await lazyGreedyQuerySelection({
|
||||
originalText: query,
|
||||
candidates: queries,
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ export const createLLMResponse = async <T extends CompletionsBodyType>(
|
|||
return requestMessages;
|
||||
})();
|
||||
|
||||
const requestBody = await llmCompletionsBodyFormat({
|
||||
const { requestBody, modelData } = await llmCompletionsBodyFormat({
|
||||
...body,
|
||||
messages: rewriteMessages
|
||||
});
|
||||
|
|
@ -89,6 +89,7 @@ export const createLLMResponse = async <T extends CompletionsBodyType>(
|
|||
// console.log(JSON.stringify(requestBody, null, 2));
|
||||
const { response, isStreamResponse, getEmptyResponseTip } = await createChatCompletion({
|
||||
body: requestBody,
|
||||
modelData,
|
||||
userKey,
|
||||
options: {
|
||||
headers: {
|
||||
|
|
@ -491,10 +492,16 @@ const llmCompletionsBodyFormat = async <T extends CompletionsBodyType>({
|
|||
parallel_tool_calls,
|
||||
toolCallMode,
|
||||
...body
|
||||
}: LLMRequestBodyType<T>): Promise<InferCompletionsBody<T>> => {
|
||||
}: LLMRequestBodyType<T>): Promise<{
|
||||
requestBody: InferCompletionsBody<T>;
|
||||
modelData: LLMModelItemType;
|
||||
}> => {
|
||||
const modelData = getLLMModel(body.model);
|
||||
if (!modelData) {
|
||||
return body as unknown as InferCompletionsBody<T>;
|
||||
return {
|
||||
requestBody: body as unknown as InferCompletionsBody<T>,
|
||||
modelData
|
||||
};
|
||||
}
|
||||
|
||||
const response_format = (() => {
|
||||
|
|
@ -548,7 +555,10 @@ const llmCompletionsBodyFormat = async <T extends CompletionsBodyType>({
|
|||
});
|
||||
}
|
||||
|
||||
return requestBody as unknown as InferCompletionsBody<T>;
|
||||
return {
|
||||
requestBody: requestBody as unknown as InferCompletionsBody<T>,
|
||||
modelData
|
||||
};
|
||||
};
|
||||
const createChatCompletion = async ({
|
||||
modelData,
|
||||
|
|
@ -579,6 +589,7 @@ const createChatCompletion = async ({
|
|||
try {
|
||||
// Rewrite model
|
||||
const modelConstantsData = modelData || getLLMModel(body.model);
|
||||
|
||||
if (!modelConstantsData) {
|
||||
return Promise.reject(`${body.model} not found`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,6 +115,25 @@ export async function delDatasetRelevantData({
|
|||
// Delete vector data
|
||||
await deleteDatasetDataVector({ teamId, datasetIds });
|
||||
|
||||
// Delete dataset_data_texts in batches by datasetId
|
||||
for (const datasetId of datasetIds) {
|
||||
await MongoDatasetDataText.deleteMany({
|
||||
teamId,
|
||||
datasetId
|
||||
}).maxTimeMS(300000); // Reduce timeout for single batch
|
||||
}
|
||||
// Delete dataset_datas in batches by datasetId
|
||||
for (const datasetId of datasetIds) {
|
||||
await MongoDatasetData.deleteMany({
|
||||
teamId,
|
||||
datasetId
|
||||
}).maxTimeMS(300000);
|
||||
}
|
||||
|
||||
await delCollectionRelatedSource({ collections });
|
||||
// Delete vector data
|
||||
await deleteDatasetDataVector({ teamId, datasetIds });
|
||||
|
||||
// delete collections
|
||||
await MongoDatasetCollection.deleteMany({
|
||||
teamId,
|
||||
|
|
|
|||
|
|
@ -5,15 +5,11 @@ import { addDays } from 'date-fns';
|
|||
import { isS3ObjectKey, jwtSignS3ObjectKey } from '../../../common/s3/utils';
|
||||
|
||||
export const formatDatasetDataValue = ({
|
||||
teamId,
|
||||
datasetId,
|
||||
q,
|
||||
a,
|
||||
imageId,
|
||||
imageDescMap
|
||||
}: {
|
||||
teamId: string;
|
||||
datasetId: string;
|
||||
q: string;
|
||||
a?: string;
|
||||
imageId?: string;
|
||||
|
|
@ -73,8 +69,6 @@ export const getFormatDatasetCiteList = (list: DatasetDataSchemaType[]) => {
|
|||
return list.map((item) => ({
|
||||
_id: item._id,
|
||||
...formatDatasetDataValue({
|
||||
teamId: item.teamId,
|
||||
datasetId: item.datasetId,
|
||||
q: item.q,
|
||||
a: item.a,
|
||||
imageId: item.imageId
|
||||
|
|
|
|||
|
|
@ -555,8 +555,6 @@ export async function searchDatasetData(
|
|||
id: String(data._id),
|
||||
updateTime: data.updateTime,
|
||||
...formatDatasetDataValue({
|
||||
teamId,
|
||||
datasetId: data.datasetId,
|
||||
q: data.q,
|
||||
a: data.a,
|
||||
imageId: data.imageId,
|
||||
|
|
@ -727,8 +725,6 @@ export async function searchDatasetData(
|
|||
collectionId: String(data.collectionId),
|
||||
updateTime: data.updateTime,
|
||||
...formatDatasetDataValue({
|
||||
teamId,
|
||||
datasetId: data.datasetId,
|
||||
q: data.q,
|
||||
a: data.a,
|
||||
imageId: data.imageId,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import { i18nT } from '../../../../../web/i18n/utils';
|
|||
import { filterDatasetsByTmbId } from '../../../dataset/utils';
|
||||
import { getDatasetSearchToolResponsePrompt } from '../../../../../global/core/ai/prompt/dataset';
|
||||
import { getNodeErrResponse } from '../utils';
|
||||
import { addLog } from '../../../../common/system/log';
|
||||
|
||||
type DatasetSearchProps = ModuleDispatchProps<{
|
||||
[NodeInputKeyEnum.datasetSelectList]: SelectedDatasetType[];
|
||||
|
|
@ -49,7 +50,6 @@ export async function dispatchDatasetSearch(
|
|||
const {
|
||||
runningAppInfo: { teamId },
|
||||
runningUserInfo: { tmbId },
|
||||
uid,
|
||||
histories,
|
||||
node,
|
||||
params: {
|
||||
|
|
@ -281,6 +281,7 @@ export async function dispatchDatasetSearch(
|
|||
: 'No results'
|
||||
};
|
||||
} catch (error) {
|
||||
addLog.error(`[Dataset search] error`, error);
|
||||
return getNodeErrResponse({ error });
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ S3_ACCESS_KEY=minioadmin
|
|||
S3_SECRET_KEY=minioadmin
|
||||
S3_PUBLIC_BUCKET=fastgpt-public # 插件文件存储公开桶
|
||||
S3_PRIVATE_BUCKET=fastgpt-private # 插件文件存储公开桶
|
||||
S3_PATH_STYLE=false # forcePathStyle 默认为 true, 当且仅当设置为 false 时关闭, 其他值都为 true
|
||||
S3_PATH_STYLE=true # forcePathStyle 默认为 true, 当且仅当设置为 false 时关闭, 其他值都为 true
|
||||
|
||||
# Redis URL
|
||||
REDIS_URL=redis://default:mypassword@127.0.0.1:6379
|
||||
|
|
|
|||
|
|
@ -9,10 +9,6 @@ const FAQ = () => {
|
|||
title: t('common:FAQ.switch_package_q'),
|
||||
desc: t('common:FAQ.switch_package_a')
|
||||
},
|
||||
{
|
||||
title: t('common:FAQ.year_day_q'),
|
||||
desc: t('common:FAQ.year_day_a')
|
||||
},
|
||||
{
|
||||
title: t('common:FAQ.check_subscription_q'),
|
||||
desc: t('common:FAQ.check_subscription_a')
|
||||
|
|
@ -42,6 +38,10 @@ const FAQ = () => {
|
|||
title: t('common:FAQ.qpm_q'),
|
||||
desc: t('common:FAQ.qpm_a')
|
||||
},
|
||||
{
|
||||
title: t('common:FAQ.year_day_q'),
|
||||
desc: t('common:FAQ.year_day_a')
|
||||
},
|
||||
{
|
||||
title: t('common:FAQ.free_user_clean_q'),
|
||||
desc: t('common:FAQ.free_user_clean_a')
|
||||
|
|
|
|||
|
|
@ -1,553 +0,0 @@
|
|||
import { NextAPI } from '@/service/middleware/entry';
|
||||
import { addLog } from '@fastgpt/service/common/system/log';
|
||||
import { authCert } from '@fastgpt/service/support/permission/auth/common';
|
||||
import { type NextApiRequest, type NextApiResponse } from 'next';
|
||||
import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset';
|
||||
import type { getDownloadStream } from '@fastgpt/service/common/file/gridfs/controller';
|
||||
import { getGFSCollection } from '@fastgpt/service/common/file/gridfs/controller';
|
||||
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
|
||||
import pLimit from 'p-limit';
|
||||
import { MongoDatasetMigrationLog } from '@fastgpt/service/core/dataset/migration/schema';
|
||||
import type { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type';
|
||||
import { randomUUID } from 'crypto';
|
||||
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
|
||||
import type { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type';
|
||||
import {
|
||||
uploadImage2S3Bucket,
|
||||
removeS3TTL,
|
||||
getFileS3Key,
|
||||
truncateFilename
|
||||
} from '@fastgpt/service/common/s3/utils';
|
||||
import { connectionMongo, Types } from '@fastgpt/service/common/mongo';
|
||||
|
||||
// 将 GridFS 的流转换为 Buffer
|
||||
async function gridFSStreamToBuffer(
|
||||
stream: Awaited<ReturnType<typeof getDownloadStream>>
|
||||
): Promise<Buffer> {
|
||||
const chunks: Buffer[] = [];
|
||||
|
||||
stream.on('data', (chunk) => chunks.push(chunk));
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
return Buffer.concat(chunks);
|
||||
}
|
||||
|
||||
// ========== Dataset Image Migration Functions ==========
|
||||
|
||||
// 获取 dataset_image 的 GridFS bucket
|
||||
function getDatasetImageGridBucket() {
|
||||
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
|
||||
bucketName: 'dataset_image'
|
||||
});
|
||||
}
|
||||
|
||||
// 获取 dataset_image 的 GridFS collection
|
||||
function getDatasetImageGFSCollection() {
|
||||
return connectionMongo.connection.db!.collection('dataset_image.files');
|
||||
}
|
||||
|
||||
// 处理单批 image
|
||||
async function processImageBatch({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
offset,
|
||||
limit,
|
||||
concurrency
|
||||
}: {
|
||||
batchId: string;
|
||||
migrationVersion: string;
|
||||
offset: number;
|
||||
limit: number;
|
||||
concurrency: number;
|
||||
}) {
|
||||
// 1. 获取这一批的 GridFS 图片文件
|
||||
const imageFiles = await getDatasetImageGFSCollection()
|
||||
.find(
|
||||
{},
|
||||
{
|
||||
projection: {
|
||||
_id: 1,
|
||||
filename: 1,
|
||||
contentType: 1,
|
||||
length: 1,
|
||||
metadata: 1
|
||||
}
|
||||
}
|
||||
)
|
||||
.skip(offset)
|
||||
.limit(limit)
|
||||
.toArray();
|
||||
|
||||
if (imageFiles.length === 0) {
|
||||
return { processed: 0, succeeded: 0, failed: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// 2. 获取所有的 imageId,并在 dataset_datas 中查找对应的记录
|
||||
const imageIds = imageFiles.map((file) => file._id.toString());
|
||||
const dataList = await MongoDatasetData.find(
|
||||
{
|
||||
teamId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.teamId))) },
|
||||
datasetId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.datasetId))) },
|
||||
collectionId: {
|
||||
$in: Array.from(new Set(imageFiles.map((file) => file.metadata?.collectionId)))
|
||||
},
|
||||
imageId: { $in: imageIds }
|
||||
},
|
||||
'_id imageId teamId datasetId collectionId updateTime'
|
||||
).lean();
|
||||
|
||||
if (dataList.length === 0) {
|
||||
return { processed: 0, succeeded: 0, failed: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// 3. 过滤已完成的
|
||||
const completedMigrations = await MongoDatasetMigrationLog.find(
|
||||
{
|
||||
resourceType: 'data_image',
|
||||
resourceId: { $in: dataList.map((d) => d._id) },
|
||||
status: 'completed'
|
||||
},
|
||||
'resourceId'
|
||||
).lean();
|
||||
|
||||
const completedIds = new Set(completedMigrations.map((m) => m.resourceId.toString()));
|
||||
const pendingDataList = dataList.filter((d) => !completedIds.has(d._id.toString()));
|
||||
|
||||
const skippedCount = dataList.length - pendingDataList.length;
|
||||
|
||||
if (pendingDataList.length === 0) {
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Image batch all skipped. Total: ${dataList.length}, Skipped: ${skippedCount}`
|
||||
);
|
||||
return {
|
||||
processed: dataList.length,
|
||||
succeeded: 0,
|
||||
failed: 0,
|
||||
skipped: skippedCount
|
||||
};
|
||||
}
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Processing ${pendingDataList.length} images (${skippedCount} skipped)`
|
||||
);
|
||||
|
||||
// 4. 创建 imageId 到 file 的映射
|
||||
const imageFileMap = new Map(imageFiles.map((file) => [file._id.toString(), file]));
|
||||
|
||||
// 5. 为每个 data 关联对应的 image file
|
||||
const imageDataPairs = pendingDataList
|
||||
.map((data) => {
|
||||
const imageFile = imageFileMap.get(data.imageId!);
|
||||
if (!imageFile) {
|
||||
addLog.warn(
|
||||
`[Migration ${batchId}] Image file not found for imageId: ${data.imageId}, dataId: ${data._id}`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
return { data, imageFile };
|
||||
})
|
||||
.filter((pair) => pair !== null);
|
||||
|
||||
if (imageDataPairs.length === 0) {
|
||||
return { processed: dataList.length, succeeded: 0, failed: 0, skipped: dataList.length };
|
||||
}
|
||||
|
||||
// 6. 创建迁移日志
|
||||
const imageMigrationLogs = imageDataPairs.map(({ data, imageFile }) => ({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
resourceType: 'data_image' as const,
|
||||
resourceId: data._id,
|
||||
teamId: data.teamId,
|
||||
datasetId: data.datasetId,
|
||||
sourceStorage: {
|
||||
type: 'gridfs' as const,
|
||||
fileId: data.imageId,
|
||||
bucketName: 'dataset_image' as any
|
||||
},
|
||||
status: 'pending' as const,
|
||||
attemptCount: 0,
|
||||
maxAttempts: 3,
|
||||
verified: false,
|
||||
operations: [],
|
||||
metadata: {
|
||||
fileName: imageFile.filename,
|
||||
originalUpdateTime: data.updateTime,
|
||||
nodeEnv: process.env.NODE_ENV
|
||||
}
|
||||
}));
|
||||
|
||||
if (imageMigrationLogs.length > 0) {
|
||||
await MongoDatasetMigrationLog.insertMany(imageMigrationLogs, { ordered: false });
|
||||
}
|
||||
|
||||
// 7. 执行迁移
|
||||
const limitFn = pLimit(concurrency);
|
||||
let succeeded = 0;
|
||||
let failed = 0;
|
||||
|
||||
const tasks = imageDataPairs.map(({ data, imageFile }) =>
|
||||
limitFn(async () => {
|
||||
try {
|
||||
const { key, dataId } = await migrateDatasetImage({ batchId, data, imageFile });
|
||||
await updateDatasetDataImageId({ batchId, dataId, key });
|
||||
succeeded++;
|
||||
} catch (error) {
|
||||
failed++;
|
||||
addLog.error(`[Migration ${batchId}] Failed to migrate image for data ${data._id}:`, error);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.allSettled(tasks);
|
||||
|
||||
return {
|
||||
processed: dataList.length,
|
||||
succeeded,
|
||||
failed,
|
||||
skipped: skippedCount
|
||||
};
|
||||
}
|
||||
|
||||
// 从 GridFS 迁移单个图片到 S3
|
||||
async function migrateDatasetImage({
|
||||
batchId,
|
||||
data,
|
||||
imageFile
|
||||
}: {
|
||||
batchId: string;
|
||||
data: DatasetDataSchemaType;
|
||||
imageFile: any;
|
||||
}) {
|
||||
const { imageId, datasetId, _id } = data;
|
||||
const dataId = _id.toString();
|
||||
|
||||
try {
|
||||
// 更新状态为处理中
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'processing',
|
||||
startedAt: new Date(),
|
||||
lastAttemptAt: new Date()
|
||||
},
|
||||
$inc: { attemptCount: 1 }
|
||||
}
|
||||
);
|
||||
|
||||
// 阶段 1: 从 GridFS 下载
|
||||
const downloadStartTime = Date.now();
|
||||
let buffer: Buffer;
|
||||
try {
|
||||
const bucket = getDatasetImageGridBucket();
|
||||
const stream = bucket.openDownloadStream(new Types.ObjectId(imageId!));
|
||||
buffer = await gridFSStreamToBuffer(stream);
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'download_from_gridfs',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - downloadStartTime,
|
||||
details: {
|
||||
fileSize: buffer.length,
|
||||
filename: imageFile.filename
|
||||
}
|
||||
}
|
||||
},
|
||||
$set: {
|
||||
'sourceStorage.fileSize': buffer.length
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'download'
|
||||
}
|
||||
}
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 阶段 2: 上传到 S3
|
||||
const uploadStartTime = Date.now();
|
||||
let key: string;
|
||||
try {
|
||||
// 从文件名中提取扩展名
|
||||
const mimetype = imageFile.contentType || 'image/png';
|
||||
const filename = imageFile.filename || 'image.png';
|
||||
|
||||
// 截断文件名以避免S3 key过长的问题
|
||||
const truncatedFilename = truncateFilename(filename);
|
||||
|
||||
// 构造 S3 key
|
||||
const { fileKey: s3Key } = getFileS3Key.dataset({ datasetId, filename: truncatedFilename });
|
||||
|
||||
// 使用 uploadImage2S3Bucket 上传图片(不设置过期时间)
|
||||
key = await uploadImage2S3Bucket('private', {
|
||||
base64Img: buffer.toString('base64'),
|
||||
uploadKey: s3Key,
|
||||
mimetype,
|
||||
filename: truncatedFilename,
|
||||
expiredTime: undefined // 不设置过期时间
|
||||
});
|
||||
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'upload_to_s3',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - uploadStartTime,
|
||||
details: {
|
||||
s3Key: key
|
||||
}
|
||||
}
|
||||
},
|
||||
$set: {
|
||||
'targetStorage.key': key,
|
||||
'targetStorage.fileSize': buffer.length
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'upload'
|
||||
}
|
||||
}
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
return {
|
||||
key,
|
||||
dataId
|
||||
};
|
||||
} catch (error) {
|
||||
addLog.error(`[Migration ${batchId}] Failed to migrate image for data ${dataId}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 更新 dataset_datas 的 imageId 为 S3 的 key
|
||||
async function updateDatasetDataImageId({
|
||||
batchId,
|
||||
dataId,
|
||||
key
|
||||
}: {
|
||||
batchId: string;
|
||||
dataId: string;
|
||||
key: string;
|
||||
}) {
|
||||
const updateStartTime = Date.now();
|
||||
|
||||
try {
|
||||
// 更新 data imageId
|
||||
await MongoDatasetData.updateOne({ _id: dataId }, { $set: { imageId: key } });
|
||||
|
||||
// 标记迁移为完成
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: dataId },
|
||||
{
|
||||
$set: {
|
||||
status: 'completed',
|
||||
completedAt: new Date()
|
||||
},
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'update_data_imageId',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - updateStartTime,
|
||||
details: {
|
||||
newImageId: key
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
dataId,
|
||||
key
|
||||
};
|
||||
} catch (error) {
|
||||
// 标记迁移为失败
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: dataId },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'update_db'
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
addLog.error(`[Migration ${batchId}] Failed to update data ${dataId}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 批量删除已完成迁移的 S3 文件的 TTL
|
||||
async function removeTTLForCompletedMigrations(batchId: string) {
|
||||
try {
|
||||
addLog.info(`[Migration ${batchId}] Removing TTL for completed migrations...`);
|
||||
|
||||
// 分批删除,避免一次查询太多
|
||||
const BATCH_SIZE = 5000;
|
||||
let offset = 0;
|
||||
let totalRemoved = 0;
|
||||
|
||||
while (true) {
|
||||
const completedMigrations = await MongoDatasetMigrationLog.find(
|
||||
{
|
||||
batchId,
|
||||
status: 'completed',
|
||||
'targetStorage.key': { $exists: true, $ne: null }
|
||||
},
|
||||
'targetStorage.key'
|
||||
)
|
||||
.skip(offset)
|
||||
.limit(BATCH_SIZE)
|
||||
.lean();
|
||||
|
||||
if (completedMigrations.length === 0) break;
|
||||
|
||||
const keys = completedMigrations
|
||||
.map((log) => log.targetStorage?.key)
|
||||
.filter(Boolean) as string[];
|
||||
|
||||
if (keys.length > 0) {
|
||||
await removeS3TTL({ key: keys, bucketName: 'private' });
|
||||
totalRemoved += keys.length;
|
||||
addLog.info(`[Migration ${batchId}] Removed TTL for ${totalRemoved} objects so far`);
|
||||
}
|
||||
|
||||
offset += BATCH_SIZE;
|
||||
|
||||
if (completedMigrations.length < BATCH_SIZE) break;
|
||||
}
|
||||
|
||||
addLog.info(`[Migration ${batchId}] Total TTL removed: ${totalRemoved}`);
|
||||
} catch (error) {
|
||||
addLog.error(`[Migration ${batchId}] Failed to remove TTL:`, error);
|
||||
// 不抛出错误,因为这不是致命问题
|
||||
}
|
||||
}
|
||||
|
||||
async function handler(req: NextApiRequest, _res: NextApiResponse) {
|
||||
await authCert({ req, authRoot: true });
|
||||
|
||||
// 迁移配置
|
||||
const config = {
|
||||
collectionBatchSize: 500,
|
||||
collectionConcurrency: 10,
|
||||
imageBatchSize: 500,
|
||||
imageConcurrency: 5,
|
||||
pauseBetweenBatches: 1000 // ms
|
||||
};
|
||||
|
||||
// 生成唯一的批次 ID
|
||||
const batchId = `migration_${Date.now()}_${randomUUID()}`;
|
||||
const migrationVersion = 'v4.14.3';
|
||||
|
||||
addLog.info(`[Migration ${batchId}] Starting migration ${migrationVersion}`);
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Config: collectionBatch=${config.collectionBatchSize}, collectionConcurrency=${config.collectionConcurrency}, imageBatch=${config.imageBatchSize}, imageConcurrency=${config.imageConcurrency}`
|
||||
);
|
||||
|
||||
// ========== Image Migration ==========
|
||||
addLog.info(`[Migration ${batchId}] Starting image migration...`);
|
||||
|
||||
const totalImageFiles = await getDatasetImageGFSCollection().countDocuments({});
|
||||
addLog.info(`[Migration ${batchId}] Total image files in GridFS: ${totalImageFiles}`);
|
||||
|
||||
let imageStats = {
|
||||
processed: 0,
|
||||
succeeded: 0,
|
||||
failed: 0,
|
||||
skipped: 0
|
||||
};
|
||||
|
||||
// 分批处理 images
|
||||
for (let offset = 0; offset < totalImageFiles; offset += config.imageBatchSize) {
|
||||
const currentBatch = Math.floor(offset / config.imageBatchSize) + 1;
|
||||
const totalBatches = Math.ceil(totalImageFiles / config.imageBatchSize);
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Processing images batch ${currentBatch}/${totalBatches} (${offset}-${offset + config.imageBatchSize})`
|
||||
);
|
||||
|
||||
const batchStats = await processImageBatch({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
offset,
|
||||
limit: config.imageBatchSize,
|
||||
concurrency: config.imageConcurrency
|
||||
});
|
||||
|
||||
imageStats.processed += batchStats.processed;
|
||||
imageStats.succeeded += batchStats.succeeded;
|
||||
imageStats.failed += batchStats.failed;
|
||||
imageStats.skipped += batchStats.skipped;
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Batch ${currentBatch}/${totalBatches} completed. Batch: +${batchStats.succeeded} succeeded, +${batchStats.failed} failed. Total progress: ${imageStats.succeeded}/${totalImageFiles}`
|
||||
);
|
||||
|
||||
// 暂停一下
|
||||
if (offset + config.imageBatchSize < totalImageFiles) {
|
||||
await new Promise((resolve) => setTimeout(resolve, config.pauseBetweenBatches));
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 批量删除已完成迁移的 TTL ==========
|
||||
await removeTTLForCompletedMigrations(batchId);
|
||||
|
||||
// ========== 汇总统计 ==========
|
||||
addLog.info(`[Migration ${batchId}] ========== Migration Summary ==========`);
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Images - Total: ${totalImageFiles}, Succeeded: ${imageStats.succeeded}, Failed: ${imageStats.failed}, Skipped: ${imageStats.skipped}`
|
||||
);
|
||||
addLog.info(`[Migration ${batchId}] =======================================`);
|
||||
|
||||
return {
|
||||
batchId,
|
||||
migrationVersion,
|
||||
summary: {
|
||||
images: {
|
||||
total: totalImageFiles,
|
||||
processed: imageStats.processed,
|
||||
succeeded: imageStats.succeeded,
|
||||
failed: imageStats.failed,
|
||||
skipped: imageStats.skipped
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export default NextAPI(handler);
|
||||
|
|
@ -244,7 +244,7 @@ async function processCollectionBatch({
|
|||
{
|
||||
projection: {
|
||||
_id: 1,
|
||||
metadata: { teamId: 1 }
|
||||
metadata: 1
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
@ -259,7 +259,10 @@ async function processCollectionBatch({
|
|||
// 2. 查找对应的 collections
|
||||
const fileIds = files.map((f) => f._id);
|
||||
const collections = await MongoDatasetCollection.find(
|
||||
{ fileId: { $in: fileIds } },
|
||||
{
|
||||
teamId: { $in: Array.from(new Set(files.map((f) => f.metadata?.teamId).filter(Boolean))) },
|
||||
fileId: { $in: fileIds }
|
||||
},
|
||||
'_id fileId teamId datasetId type parentId name updateTime'
|
||||
).lean();
|
||||
|
||||
|
|
@ -531,6 +534,11 @@ async function processImageBatch({
|
|||
const imageIds = imageFiles.map((file) => file._id.toString());
|
||||
const dataList = await MongoDatasetData.find(
|
||||
{
|
||||
teamId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.teamId))) },
|
||||
datasetId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.datasetId))) },
|
||||
collectionId: {
|
||||
$in: Array.from(new Set(imageFiles.map((file) => file.metadata?.collectionId)))
|
||||
},
|
||||
imageId: { $in: imageIds }
|
||||
},
|
||||
'_id imageId teamId datasetId collectionId updateTime'
|
||||
|
|
|
|||
|
|
@ -244,7 +244,7 @@ async function processCollectionBatch({
|
|||
{
|
||||
projection: {
|
||||
_id: 1,
|
||||
metadata: { teamId: 1 }
|
||||
metadata: 1
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
@ -260,7 +260,10 @@ async function processCollectionBatch({
|
|||
// 2. 查找对应的 collections
|
||||
const fileIds = files.map((f) => f._id);
|
||||
const collections = await MongoDatasetCollection.find(
|
||||
{ fileId: { $in: fileIds, $not: { $regex: /^dataset\// } } },
|
||||
{
|
||||
teamId: { $in: Array.from(new Set(files.map((f) => f.metadata?.teamId).filter(Boolean))) },
|
||||
fileId: { $in: fileIds, $not: { $regex: /^dataset\// } }
|
||||
},
|
||||
'_id fileId teamId datasetId type parentId name updateTime'
|
||||
).lean();
|
||||
|
||||
|
|
@ -423,6 +426,381 @@ async function updateDatasetCollectionFileId({
|
|||
}
|
||||
}
|
||||
|
||||
// ========== Dataset Image Migration Functions ==========
|
||||
|
||||
// 获取 dataset_image 的 GridFS bucket
|
||||
function getDatasetImageGridBucket() {
|
||||
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
|
||||
bucketName: 'dataset_image'
|
||||
});
|
||||
}
|
||||
|
||||
// 获取 dataset_image 的 GridFS collection
|
||||
function getDatasetImageGFSCollection() {
|
||||
return connectionMongo.connection.db!.collection('dataset_image.files');
|
||||
}
|
||||
|
||||
// 处理单批 image
|
||||
async function processImageBatch({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
offset,
|
||||
limit,
|
||||
concurrency
|
||||
}: {
|
||||
batchId: string;
|
||||
migrationVersion: string;
|
||||
offset: number;
|
||||
limit: number;
|
||||
concurrency: number;
|
||||
}) {
|
||||
// 1. 获取这一批的 GridFS 图片文件
|
||||
const imageFiles = await getDatasetImageGFSCollection()
|
||||
.find(
|
||||
{},
|
||||
{
|
||||
projection: {
|
||||
_id: 1,
|
||||
filename: 1,
|
||||
contentType: 1,
|
||||
length: 1,
|
||||
metadata: 1
|
||||
}
|
||||
}
|
||||
)
|
||||
.skip(offset)
|
||||
.limit(limit)
|
||||
.toArray();
|
||||
|
||||
if (imageFiles.length === 0) {
|
||||
return { processed: 0, succeeded: 0, failed: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// 2. 获取所有的 imageId,并在 dataset_datas 中查找对应的记录
|
||||
const imageIds = imageFiles.map((file) => file._id.toString());
|
||||
const dataList = await MongoDatasetData.find(
|
||||
{
|
||||
teamId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.teamId))) },
|
||||
datasetId: { $in: Array.from(new Set(imageFiles.map((file) => file.metadata?.datasetId))) },
|
||||
collectionId: {
|
||||
$in: Array.from(new Set(imageFiles.map((file) => file.metadata?.collectionId)))
|
||||
},
|
||||
imageId: { $in: imageIds }
|
||||
},
|
||||
'_id imageId teamId datasetId collectionId updateTime'
|
||||
).lean();
|
||||
|
||||
if (dataList.length === 0) {
|
||||
return { processed: 0, succeeded: 0, failed: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// 3. 过滤已完成的
|
||||
const completedMigrations = await MongoDatasetMigrationLog.find(
|
||||
{
|
||||
resourceType: 'data_image',
|
||||
resourceId: { $in: dataList.map((d) => d._id) },
|
||||
status: 'completed'
|
||||
},
|
||||
'resourceId'
|
||||
).lean();
|
||||
|
||||
const completedIds = new Set(completedMigrations.map((m) => m.resourceId.toString()));
|
||||
const pendingDataList = dataList.filter((d) => !completedIds.has(d._id.toString()));
|
||||
|
||||
const skippedCount = dataList.length - pendingDataList.length;
|
||||
|
||||
if (pendingDataList.length === 0) {
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Image batch all skipped. Total: ${dataList.length}, Skipped: ${skippedCount}`
|
||||
);
|
||||
return {
|
||||
processed: dataList.length,
|
||||
succeeded: 0,
|
||||
failed: 0,
|
||||
skipped: skippedCount
|
||||
};
|
||||
}
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Processing ${pendingDataList.length} images (${skippedCount} skipped)`
|
||||
);
|
||||
|
||||
// 4. 创建 imageId 到 file 的映射
|
||||
const imageFileMap = new Map(imageFiles.map((file) => [file._id.toString(), file]));
|
||||
|
||||
// 5. 为每个 data 关联对应的 image file
|
||||
const imageDataPairs = pendingDataList
|
||||
.map((data) => {
|
||||
const imageFile = imageFileMap.get(data.imageId!);
|
||||
if (!imageFile) {
|
||||
addLog.warn(
|
||||
`[Migration ${batchId}] Image file not found for imageId: ${data.imageId}, dataId: ${data._id}`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
return { data, imageFile };
|
||||
})
|
||||
.filter((pair) => pair !== null);
|
||||
|
||||
if (imageDataPairs.length === 0) {
|
||||
return { processed: dataList.length, succeeded: 0, failed: 0, skipped: dataList.length };
|
||||
}
|
||||
|
||||
// 6. 创建迁移日志
|
||||
const imageMigrationLogs = imageDataPairs.map(({ data, imageFile }) => ({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
resourceType: 'dataset_image' as const,
|
||||
resourceId: data._id,
|
||||
teamId: data.teamId,
|
||||
datasetId: data.datasetId,
|
||||
sourceStorage: {
|
||||
type: 'gridfs' as const,
|
||||
fileId: data.imageId,
|
||||
bucketName: 'dataset_image' as any
|
||||
},
|
||||
status: 'pending' as const,
|
||||
attemptCount: 0,
|
||||
maxAttempts: 3,
|
||||
verified: false,
|
||||
operations: [],
|
||||
metadata: {
|
||||
fileName: imageFile.filename,
|
||||
originalUpdateTime: data.updateTime,
|
||||
nodeEnv: process.env.NODE_ENV
|
||||
}
|
||||
}));
|
||||
|
||||
if (imageMigrationLogs.length > 0) {
|
||||
await MongoDatasetMigrationLog.insertMany(imageMigrationLogs, { ordered: false });
|
||||
}
|
||||
|
||||
// 7. 执行迁移
|
||||
const limitFn = pLimit(concurrency);
|
||||
let succeeded = 0;
|
||||
let failed = 0;
|
||||
|
||||
const tasks = imageDataPairs.map(({ data, imageFile }) =>
|
||||
limitFn(async () => {
|
||||
try {
|
||||
const { key, dataId } = await migrateDatasetImage({ batchId, data, imageFile });
|
||||
await updateDatasetDataImageId({ batchId, dataId, key });
|
||||
succeeded++;
|
||||
} catch (error) {
|
||||
failed++;
|
||||
addLog.error(`[Migration ${batchId}] Failed to migrate image for data ${data._id}:`, error);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.allSettled(tasks);
|
||||
|
||||
return {
|
||||
processed: dataList.length,
|
||||
succeeded,
|
||||
failed,
|
||||
skipped: skippedCount
|
||||
};
|
||||
}
|
||||
|
||||
// 从 GridFS 迁移单个图片到 S3
|
||||
async function migrateDatasetImage({
|
||||
batchId,
|
||||
data,
|
||||
imageFile
|
||||
}: {
|
||||
batchId: string;
|
||||
data: DatasetDataSchemaType;
|
||||
imageFile: any;
|
||||
}) {
|
||||
const { imageId, datasetId, _id } = data;
|
||||
const dataId = _id.toString();
|
||||
|
||||
try {
|
||||
// 更新状态为处理中
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'processing',
|
||||
startedAt: new Date(),
|
||||
lastAttemptAt: new Date()
|
||||
},
|
||||
$inc: { attemptCount: 1 }
|
||||
}
|
||||
);
|
||||
|
||||
// 阶段 1: 从 GridFS 下载
|
||||
const downloadStartTime = Date.now();
|
||||
let buffer: Buffer;
|
||||
try {
|
||||
const bucket = getDatasetImageGridBucket();
|
||||
const stream = bucket.openDownloadStream(new Types.ObjectId(imageId!));
|
||||
buffer = await gridFSStreamToBuffer(stream);
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'download_from_gridfs',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - downloadStartTime,
|
||||
details: {
|
||||
fileSize: buffer.length,
|
||||
filename: imageFile.filename
|
||||
}
|
||||
}
|
||||
},
|
||||
$set: {
|
||||
'sourceStorage.fileSize': buffer.length
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'download'
|
||||
}
|
||||
}
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 阶段 2: 上传到 S3
|
||||
const uploadStartTime = Date.now();
|
||||
let key: string;
|
||||
try {
|
||||
// 从文件名中提取扩展名
|
||||
const mimetype = imageFile.contentType || 'image/png';
|
||||
const filename = imageFile.filename || 'image.png';
|
||||
|
||||
// 截断文件名以避免S3 key过长的问题
|
||||
const truncatedFilename = truncateFilename(filename);
|
||||
|
||||
// 构造 S3 key
|
||||
const { fileKey: s3Key } = getFileS3Key.dataset({ datasetId, filename: truncatedFilename });
|
||||
|
||||
// 使用 uploadImage2S3Bucket 上传图片(不设置过期时间)
|
||||
key = await uploadImage2S3Bucket('private', {
|
||||
base64Img: buffer.toString('base64'),
|
||||
uploadKey: s3Key,
|
||||
mimetype,
|
||||
filename: truncatedFilename,
|
||||
expiredTime: undefined // 不设置过期时间
|
||||
});
|
||||
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'upload_to_s3',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - uploadStartTime,
|
||||
details: {
|
||||
s3Key: key
|
||||
}
|
||||
}
|
||||
},
|
||||
$set: {
|
||||
'targetStorage.key': key,
|
||||
'targetStorage.fileSize': buffer.length
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: _id },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'upload'
|
||||
}
|
||||
}
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
return {
|
||||
key,
|
||||
dataId
|
||||
};
|
||||
} catch (error) {
|
||||
addLog.error(`[Migration ${batchId}] Failed to migrate image for data ${dataId}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 更新 dataset_datas 的 imageId 为 S3 的 key
|
||||
async function updateDatasetDataImageId({
|
||||
batchId,
|
||||
dataId,
|
||||
key
|
||||
}: {
|
||||
batchId: string;
|
||||
dataId: string;
|
||||
key: string;
|
||||
}) {
|
||||
const updateStartTime = Date.now();
|
||||
|
||||
try {
|
||||
// 更新 data imageId
|
||||
await MongoDatasetData.updateOne({ _id: dataId }, { $set: { imageId: key } });
|
||||
|
||||
// 标记迁移为完成
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: dataId },
|
||||
{
|
||||
$set: {
|
||||
status: 'completed',
|
||||
completedAt: new Date()
|
||||
},
|
||||
$push: {
|
||||
operations: {
|
||||
action: 'update_data_imageId',
|
||||
timestamp: new Date(),
|
||||
success: true,
|
||||
duration: Date.now() - updateStartTime,
|
||||
details: {
|
||||
newImageId: key
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
dataId,
|
||||
key
|
||||
};
|
||||
} catch (error) {
|
||||
// 标记迁移为失败
|
||||
await MongoDatasetMigrationLog.updateOne(
|
||||
{ batchId, resourceId: dataId },
|
||||
{
|
||||
$set: {
|
||||
status: 'failed',
|
||||
'error.message': error instanceof Error ? error.message : String(error),
|
||||
'error.stack': error instanceof Error ? error.stack : undefined,
|
||||
'error.phase': 'update_db'
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
addLog.error(`[Migration ${batchId}] Failed to update data ${dataId}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 批量删除已完成迁移的 S3 文件的 TTL
|
||||
async function removeTTLForCompletedMigrations(batchId: string) {
|
||||
try {
|
||||
|
|
@ -542,6 +920,54 @@ async function handler(req: NextApiRequest, _res: NextApiResponse) {
|
|||
// ========== 批量删除已完成迁移的 TTL ==========
|
||||
await removeTTLForCompletedMigrations(batchId);
|
||||
|
||||
// ========== Image Migration ==========
|
||||
addLog.info(`[Migration ${batchId}] Starting image migration...`);
|
||||
|
||||
const totalImageFiles = await getDatasetImageGFSCollection().countDocuments({});
|
||||
addLog.info(`[Migration ${batchId}] Total image files in GridFS: ${totalImageFiles}`);
|
||||
|
||||
let imageStats = {
|
||||
processed: 0,
|
||||
succeeded: 0,
|
||||
failed: 0,
|
||||
skipped: 0
|
||||
};
|
||||
|
||||
// 分批处理 images
|
||||
for (let offset = 0; offset < totalImageFiles; offset += config.imageBatchSize) {
|
||||
const currentBatch = Math.floor(offset / config.imageBatchSize) + 1;
|
||||
const totalBatches = Math.ceil(totalImageFiles / config.imageBatchSize);
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Processing images batch ${currentBatch}/${totalBatches} (${offset}-${offset + config.imageBatchSize})`
|
||||
);
|
||||
|
||||
const batchStats = await processImageBatch({
|
||||
batchId,
|
||||
migrationVersion,
|
||||
offset,
|
||||
limit: config.imageBatchSize,
|
||||
concurrency: config.imageConcurrency
|
||||
});
|
||||
|
||||
imageStats.processed += batchStats.processed;
|
||||
imageStats.succeeded += batchStats.succeeded;
|
||||
imageStats.failed += batchStats.failed;
|
||||
imageStats.skipped += batchStats.skipped;
|
||||
|
||||
addLog.info(
|
||||
`[Migration ${batchId}] Batch ${currentBatch}/${totalBatches} completed. Batch: +${batchStats.succeeded} succeeded, +${batchStats.failed} failed. Total progress: ${imageStats.succeeded}/${totalImageFiles}`
|
||||
);
|
||||
|
||||
// 暂停一下
|
||||
if (offset + config.imageBatchSize < totalImageFiles) {
|
||||
await new Promise((resolve) => setTimeout(resolve, config.pauseBetweenBatches));
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 批量删除已完成迁移的 TTL ==========
|
||||
await removeTTLForCompletedMigrations(batchId);
|
||||
|
||||
// ========== 汇总统计 ==========
|
||||
addLog.info(`[Migration ${batchId}] ========== Migration Summary ==========`);
|
||||
addLog.info(
|
||||
|
|
@ -562,6 +988,13 @@ async function handler(req: NextApiRequest, _res: NextApiResponse) {
|
|||
succeeded: collectionStats.succeeded,
|
||||
failed: collectionStats.failed,
|
||||
skipped: collectionStats.skipped
|
||||
},
|
||||
images: {
|
||||
total: totalImageFiles,
|
||||
processed: imageStats.processed,
|
||||
succeeded: imageStats.succeeded,
|
||||
failed: imageStats.failed,
|
||||
skipped: imageStats.skipped
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ export default NextAPI(handler);
|
|||
const testLLMModel = async (model: LLMModelItemType, headers: Record<string, string>) => {
|
||||
const { answerText } = await createLLMResponse({
|
||||
body: {
|
||||
model: model.model,
|
||||
model,
|
||||
messages: [{ role: 'user', content: 'hi' }],
|
||||
stream: true
|
||||
},
|
||||
|
|
|
|||
|
|
@ -80,8 +80,6 @@ async function handler(req: ApiRequestProps<GetQuoteDataProps>): Promise<GetQuot
|
|||
return {
|
||||
collection,
|
||||
...formatDatasetDataValue({
|
||||
teamId: datasetData.teamId,
|
||||
datasetId: datasetData.datasetId,
|
||||
q: datasetData.q,
|
||||
a: datasetData.a,
|
||||
imageId: datasetData.imageId
|
||||
|
|
@ -98,8 +96,6 @@ async function handler(req: ApiRequestProps<GetQuoteDataProps>): Promise<GetQuot
|
|||
return {
|
||||
collection,
|
||||
...formatDatasetDataValue({
|
||||
teamId: datasetData.teamId,
|
||||
datasetId: datasetData.datasetId,
|
||||
q: datasetData.q,
|
||||
a: datasetData.a,
|
||||
imageId: datasetData.imageId
|
||||
|
|
|
|||
|
|
@ -406,32 +406,59 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
|
|||
|
||||
res.end();
|
||||
} else {
|
||||
const responseContent = (() => {
|
||||
if (assistantResponses.length === 0) return '';
|
||||
if (assistantResponses.length === 1 && assistantResponses[0].text?.content)
|
||||
return assistantResponses[0].text?.content;
|
||||
|
||||
if (!detail) {
|
||||
return assistantResponses
|
||||
.map((item) => item?.text?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
const formatResponseContent = removeAIResponseCite(assistantResponses, retainDatasetCite);
|
||||
const formattdResponse = (() => {
|
||||
if (formatResponseContent.length === 0)
|
||||
return {
|
||||
reasoning: '',
|
||||
content: ''
|
||||
};
|
||||
if (formatResponseContent.length === 1) {
|
||||
return {
|
||||
reasoning: formatResponseContent[0].reasoning?.content,
|
||||
content: formatResponseContent[0].text?.content
|
||||
};
|
||||
}
|
||||
|
||||
return assistantResponses;
|
||||
if (!detail) {
|
||||
return {
|
||||
reasoning: formatResponseContent
|
||||
.map((item) => item?.reasoning?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n'),
|
||||
content: formatResponseContent
|
||||
.map((item) => item?.text?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n')
|
||||
};
|
||||
}
|
||||
|
||||
return formatResponseContent;
|
||||
})();
|
||||
const formatResponseContent = removeAIResponseCite(responseContent, retainDatasetCite);
|
||||
const error = flowResponses[flowResponses.length - 1]?.error;
|
||||
|
||||
const error =
|
||||
flowResponses[flowResponses.length - 1]?.error ||
|
||||
flowResponses[flowResponses.length - 1]?.errorText;
|
||||
|
||||
res.json({
|
||||
...(detail ? { responseData: feResponseData, newVariables } : {}),
|
||||
error,
|
||||
id: saveChatId,
|
||||
id: chatId || '',
|
||||
model: '',
|
||||
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 1 },
|
||||
choices: [
|
||||
{
|
||||
message: { role: 'assistant', content: formatResponseContent },
|
||||
message: {
|
||||
role: 'assistant',
|
||||
...(Array.isArray(formattdResponse)
|
||||
? { content: formattdResponse }
|
||||
: {
|
||||
content: formattdResponse.content,
|
||||
...(formattdResponse.reasoning && {
|
||||
reasoning_content: formattdResponse.reasoning
|
||||
})
|
||||
})
|
||||
},
|
||||
finish_reason: 'stop',
|
||||
index: 0
|
||||
}
|
||||
|
|
|
|||
|
|
@ -402,22 +402,39 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
|
|||
|
||||
res.end();
|
||||
} else {
|
||||
const responseContent = (() => {
|
||||
if (assistantResponses.length === 0) return '';
|
||||
if (assistantResponses.length === 1 && assistantResponses[0].text?.content)
|
||||
return assistantResponses[0].text?.content;
|
||||
|
||||
if (!detail) {
|
||||
return assistantResponses
|
||||
.map((item) => item?.text?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
const formatResponseContent = removeAIResponseCite(assistantResponses, retainDatasetCite);
|
||||
const formattdResponse = (() => {
|
||||
if (formatResponseContent.length === 0)
|
||||
return {
|
||||
reasoning: '',
|
||||
content: ''
|
||||
};
|
||||
if (formatResponseContent.length === 1) {
|
||||
return {
|
||||
reasoning: formatResponseContent[0].reasoning?.content,
|
||||
content: formatResponseContent[0].text?.content
|
||||
};
|
||||
}
|
||||
|
||||
return assistantResponses;
|
||||
if (!detail) {
|
||||
return {
|
||||
reasoning: formatResponseContent
|
||||
.map((item) => item?.reasoning?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n'),
|
||||
content: formatResponseContent
|
||||
.map((item) => item?.text?.content)
|
||||
.filter(Boolean)
|
||||
.join('\n')
|
||||
};
|
||||
}
|
||||
|
||||
return formatResponseContent;
|
||||
})();
|
||||
const formatResponseContent = removeAIResponseCite(responseContent, retainDatasetCite);
|
||||
const error = flowResponses[flowResponses.length - 1]?.error;
|
||||
|
||||
const error =
|
||||
flowResponses[flowResponses.length - 1]?.error ||
|
||||
flowResponses[flowResponses.length - 1]?.errorText;
|
||||
|
||||
res.json({
|
||||
...(detail ? { responseData: feResponseData, newVariables } : {}),
|
||||
|
|
@ -427,7 +444,17 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
|
|||
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 1 },
|
||||
choices: [
|
||||
{
|
||||
message: { role: 'assistant', content: formatResponseContent },
|
||||
message: {
|
||||
role: 'assistant',
|
||||
...(Array.isArray(formattdResponse)
|
||||
? { content: formattdResponse }
|
||||
: {
|
||||
content: formattdResponse.content,
|
||||
...(formattdResponse.reasoning && {
|
||||
reasoning_content: formattdResponse.reasoning
|
||||
})
|
||||
})
|
||||
},
|
||||
finish_reason: 'stop',
|
||||
index: 0
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue