FastGPT/packages/service/common/s3/sources/dataset/index.ts
Archer 2ccb5b50c6
Some checks are pending
Document deploy / sync-images (push) Waiting to run
Document deploy / generate-timestamp (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions
V4.14.4 features (#6036)
* feat: add query optimize and bill (#6021)

* add query optimize and bill

* perf: query extension

* fix: embe model

* remove log

* remove log

* fix: test

---------

Co-authored-by: xxyyh <2289112474@qq>
Co-authored-by: archer <545436317@qq.com>

* feat: notice (#6013)

* feat: record user's language

* feat: notice points/dataset indexes; support count limit; update docker-compose.yml

* fix: ts error

* feat: send auth code i18n

* chore: dataset notice limit

* chore: adjust

* fix: ts

* fix: countLimit race condition; i18n en-prefix locale fallback to en

---------

Co-authored-by: archer <545436317@qq.com>

* perf: comment

* perf: send inform code

* fix: type error (#6029)

* feat: add ip region for chat logs (#6010)

* feat: add ip region for chat logs

* refactor: use Geolite2.mmdb

* fix: export chat logs

* fix: return location directly

* test: add unit test

* perf: log show ip data

* adjust commercial plans (#6008)

* plan frontend

* plan limit

* coupon

* discount coupon

* fix

* type

* fix audit

* type

* plan name

* legacy plan

* track

* feat: add discount coupon

* fix

* fix discount coupon

* openapi

* type

* type

* env

* api type

* fix

* fix: simple agent plugin input & agent dashboard card (#6034)

* refactor: remove gridfs (#6031)

* fix: replace gridfs multer operations with s3 compatible ops

* wip: s3 features

* refactor: remove gridfs

* fix

* perf: mock test

* doc

* doc

* doc

* fix: test

* fix: s3

* fix: mock s3

* remove invalid config

* fix: init query extension

* initv4144 (#6037)

* chore: initv4144

* fix

* version

* fix: new plans (#6039)

* fix: new plans

* qr modal tip

* fix: buffer raw text filename (#6040)

* fix: initv4144 (#6041)

* fix: pay refresh (#6042)

* fix: migration shell

* rename collection

* clear timerlock

* clear timerlock

* perf: faq

* perf: bill schema

* fix: openapi

* doc

* fix: share var render

* feat: delete dataset queue

* plan usage display (#6043)

* plan usage display

* text

* fix

* fix: ts

* perf: remove invalid code

* perf: init shell

* doc

* perf: rename field

* perf: avatar presign

* init

* custom plan text (#6045)

* fix plans

* fix

* fixed

* computed

---------

Co-authored-by: archer <545436317@qq.com>

* init shell

* plan text & price page back button (#6046)

* init

* index

* delete dataset

* delete dataset

* perf: delete dataset

* init

---------

Co-authored-by: YeYuheng <57035043+YYH211@users.noreply.github.com>
Co-authored-by: xxyyh <2289112474@qq>
Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>
Co-authored-by: Roy <whoeverimf5@gmail.com>
Co-authored-by: heheer <heheer@sealos.io>
2025-12-08 01:44:15 +08:00

265 lines
8.0 KiB
TypeScript

import { S3Sources } from '../../type';
import { S3PrivateBucket } from '../../buckets/private';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import {
type AddRawTextBufferParams,
AddRawTextBufferParamsSchema,
type CreateGetDatasetFileURLParams,
CreateGetDatasetFileURLParamsSchema,
type CreateUploadDatasetFileParams,
CreateUploadDatasetFileParamsSchema,
type DeleteDatasetFilesByPrefixParams,
DeleteDatasetFilesByPrefixParamsSchema,
type GetDatasetFileContentParams,
GetDatasetFileContentParamsSchema,
type GetRawTextBufferParams,
type UploadParams,
UploadParamsSchema
} from './type';
import { MongoS3TTL } from '../../schema';
import { addHours, addMinutes } from 'date-fns';
import { addLog } from '../../../system/log';
import { detectFileEncoding } from '@fastgpt/global/common/file/tools';
import { readS3FileContentByBuffer } from '../../../file/read/utils';
import path from 'node:path';
import { Mimes } from '../../constants';
import { getFileS3Key, truncateFilename } from '../../utils';
import { createHash } from 'node:crypto';
import { S3Error } from 'minio';
export class S3DatasetSource {
public bucket: S3PrivateBucket;
private static instance: S3DatasetSource;
constructor() {
this.bucket = new S3PrivateBucket();
}
static getInstance() {
return (this.instance ??= new S3DatasetSource());
}
// 下载链接
async createGetDatasetFileURL(params: CreateGetDatasetFileURLParams) {
const { key, expiredHours, external } = CreateGetDatasetFileURLParamsSchema.parse(params);
if (external) {
return await this.bucket.createExternalUrl({ key, expiredHours });
}
return await this.bucket.createPreviewUrl({ key, expiredHours });
}
// 上传链接
async createUploadDatasetFileURL(params: CreateUploadDatasetFileParams) {
const { filename, datasetId } = CreateUploadDatasetFileParamsSchema.parse(params);
const { fileKey } = getFileS3Key.dataset({ datasetId, filename });
return await this.bucket.createPostPresignedUrl(
{ rawKey: fileKey, filename },
{ expiredHours: 3 }
);
}
/**
* 可以根据 datasetId 或者 prefix 删除文件
* 如果存在 rawPrefix 则优先使用 rawPrefix 去删除文件,否则使用 datasetId 拼接前缀去删除文件
* 比如根据被解析的文档前缀去删除解析出来的图片
**/
deleteDatasetFilesByPrefix(params: DeleteDatasetFilesByPrefixParams) {
const { datasetId } = DeleteDatasetFilesByPrefixParamsSchema.parse(params);
const prefix = [S3Sources.dataset, datasetId].filter(Boolean).join('/');
return this.bucket.addDeleteJob({ prefix });
}
// 单个键删除
deleteDatasetFileByKey(key?: string) {
return this.bucket.addDeleteJob({ key });
}
// 多个键删除
deleteDatasetFilesByKeys(keys: string[]) {
return this.bucket.addDeleteJob({ keys });
}
// 获取文件流
getDatasetFileStream(key: string) {
return this.bucket.getObject(key);
}
// 获取文件状态
getDatasetFileStat(key: string) {
try {
return this.bucket.statObject(key);
} catch (error) {
if (error instanceof S3Error && error.message === 'Not Found') {
return null;
}
return Promise.reject(error);
}
}
// 获取文件元数据
async getFileMetadata(key: string) {
const stat = await this.getDatasetFileStat(key);
if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' };
const contentLength = stat.size;
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
const extension = parseFileExtensionFromUrl(filename);
const contentType: string = stat.metaData['content-type'];
return {
filename,
extension,
contentType,
contentLength
};
}
async getDatasetBase64Image(key: string): Promise<string> {
const [stream, metadata] = await Promise.all([
this.getDatasetFileStream(key),
this.getFileMetadata(key)
]);
const buffer = await this.bucket.fileStreamToBuffer(stream);
const base64 = buffer.toString('base64');
return `data:${metadata.contentType || 'image/jpeg'};base64,${base64}`;
}
async getDatasetFileRawText(params: GetDatasetFileContentParams) {
const { fileId, teamId, tmbId, customPdfParse, getFormatText, usageId } =
GetDatasetFileContentParamsSchema.parse(params);
const rawTextBuffer = await this.getRawTextBuffer({ customPdfParse, sourceId: fileId });
if (rawTextBuffer) {
return {
rawText: rawTextBuffer.text,
filename: rawTextBuffer.filename
};
}
const [metadata, stream] = await Promise.all([
this.getFileMetadata(fileId),
this.getDatasetFileStream(fileId)
]);
const extension = metadata.extension;
const filename: string = decodeURIComponent(metadata.filename);
const start = Date.now();
const buffer = await this.bucket.fileStreamToBuffer(stream);
addLog.debug('get dataset file buffer', { time: Date.now() - start });
const encoding = detectFileEncoding(buffer);
const { fileParsedPrefix } = getFileS3Key.s3Key(fileId);
const { rawText } = await readS3FileContentByBuffer({
teamId,
tmbId,
extension,
buffer,
encoding,
customPdfParse,
usageId,
getFormatText,
imageKeyOptions: {
prefix: fileParsedPrefix
}
});
this.addRawTextBuffer({
sourceId: fileId,
sourceName: filename,
text: rawText,
customPdfParse
});
return {
rawText,
filename
};
}
// 根据文件 Buffer 上传文件
async upload(params: UploadParams): Promise<string> {
const { datasetId, filename, ...file } = UploadParamsSchema.parse(params);
// 截断文件名以避免 S3 key 过长的问题
const truncatedFilename = truncateFilename(filename);
const { fileKey: key } = getFileS3Key.dataset({ datasetId, filename: truncatedFilename });
const { stream, size } = (() => {
if ('buffer' in file) {
return {
stream: file.buffer,
size: file.buffer.length
};
}
return {
stream: file.stream,
size: file.size
};
})();
await MongoS3TTL.create({
minioKey: key,
bucketName: this.bucket.name,
expiredTime: addHours(new Date(), 3)
});
await this.bucket.putObject(key, stream, size, {
'content-type': Mimes[path.extname(truncatedFilename) as keyof typeof Mimes],
'upload-time': new Date().toISOString(),
'origin-filename': encodeURIComponent(truncatedFilename)
});
return key;
}
async addRawTextBuffer(params: AddRawTextBufferParams) {
const { sourceId, sourceName, text, customPdfParse } =
AddRawTextBufferParamsSchema.parse(params);
// 因为 Key 唯一对应一个 Object 所以不需要根据文件内容计算 Hash 直接用 Key 计算 Hash 就行了
const hash = createHash('md5').update(sourceId).digest('hex');
const key = getFileS3Key.rawText({ hash, customPdfParse });
await MongoS3TTL.create({
minioKey: key,
bucketName: this.bucket.name,
expiredTime: addMinutes(new Date(), 20)
});
const buffer = Buffer.from(text);
await this.bucket.putObject(key, buffer, buffer.length, {
'content-type': 'text/plain',
'origin-filename': encodeURIComponent(sourceName),
'upload-time': new Date().toISOString()
});
return key;
}
async getRawTextBuffer(params: GetRawTextBufferParams) {
const { customPdfParse, sourceId } = params;
const hash = createHash('md5').update(sourceId).digest('hex');
const key = getFileS3Key.rawText({ hash, customPdfParse });
if (!(await this.bucket.isObjectExists(key))) return null;
const [stream, metadata] = await Promise.all([
this.bucket.getObject(key),
this.getFileMetadata(key)
]);
const buffer = await this.bucket.fileStreamToBuffer(stream);
return {
text: buffer.toString('utf-8'),
filename: metadata.filename
};
}
}
export function getS3DatasetSource() {
return S3DatasetSource.getInstance();
}