mirror of
https://github.com/labring/FastGPT.git
synced 2025-12-25 20:02:47 +00:00
perf: s3 controller
This commit is contained in:
parent
9f09fbffbb
commit
744fc925f6
|
|
@ -15,6 +15,7 @@ import { addLog } from '../../system/log';
|
|||
import { addS3DelJob } from '../mq';
|
||||
import { type Readable } from 'node:stream';
|
||||
import { type UploadFileByBufferParams, UploadFileByBufferSchema } from '../type';
|
||||
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
|
||||
|
||||
export class S3BaseBucket {
|
||||
private _client: Client;
|
||||
|
|
@ -26,7 +27,7 @@ export class S3BaseBucket {
|
|||
* @param options the options for the s3 client
|
||||
*/
|
||||
constructor(
|
||||
private readonly bucketName: string,
|
||||
public readonly bucketName: string,
|
||||
public options: Partial<S3OptionsType> = defaultS3Options
|
||||
) {
|
||||
options = { ...defaultS3Options, ...options };
|
||||
|
|
@ -56,20 +57,18 @@ export class S3BaseBucket {
|
|||
}
|
||||
|
||||
const init = async () => {
|
||||
if (!(await this.exist())) {
|
||||
// Not exists bucket, create it
|
||||
if (!(await this.client.bucketExists(this.bucketName))) {
|
||||
await this.client.makeBucket(this.bucketName);
|
||||
}
|
||||
await this.options.afterInit?.();
|
||||
console.log(`S3 init success: ${this.name}`);
|
||||
console.log(`S3 init success: ${this.bucketName}`);
|
||||
};
|
||||
if (this.options.init) {
|
||||
init();
|
||||
}
|
||||
}
|
||||
|
||||
get name(): string {
|
||||
return this.bucketName;
|
||||
}
|
||||
get client(): Client {
|
||||
return this._client;
|
||||
}
|
||||
|
|
@ -110,21 +109,17 @@ export class S3BaseBucket {
|
|||
copyConditions?: CopyConditions;
|
||||
};
|
||||
}): ReturnType<Client['copyObject']> {
|
||||
const bucket = this.name;
|
||||
const bucket = this.bucketName;
|
||||
if (options?.temporary) {
|
||||
await MongoS3TTL.create({
|
||||
minioKey: to,
|
||||
bucketName: this.name,
|
||||
bucketName: this.bucketName,
|
||||
expiredTime: addHours(new Date(), 24)
|
||||
});
|
||||
}
|
||||
return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions);
|
||||
}
|
||||
|
||||
exist(): Promise<boolean> {
|
||||
return this.client.bucketExists(this.name);
|
||||
}
|
||||
|
||||
async delete(objectKey: string, options?: RemoveOptions): Promise<void> {
|
||||
try {
|
||||
if (!objectKey) return Promise.resolve();
|
||||
|
|
@ -133,11 +128,11 @@ export class S3BaseBucket {
|
|||
const fileParsedPrefix = `${path.dirname(objectKey)}/${path.basename(objectKey, path.extname(objectKey))}-parsed`;
|
||||
await this.addDeleteJob({ prefix: fileParsedPrefix });
|
||||
|
||||
return await this.client.removeObject(this.name, objectKey, options);
|
||||
return await this.client.removeObject(this.bucketName, objectKey, options);
|
||||
} catch (error) {
|
||||
if (error instanceof S3Error) {
|
||||
if (error.code === 'InvalidObjectName') {
|
||||
addLog.warn(`${this.name} delete object not found: ${objectKey}`, error);
|
||||
addLog.warn(`${this.bucketName} delete object not found: ${objectKey}`, error);
|
||||
return Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
|
@ -145,27 +140,43 @@ export class S3BaseBucket {
|
|||
}
|
||||
}
|
||||
|
||||
// 列出文件
|
||||
listObjectsV2(
|
||||
...params: Parameters<Client['listObjectsV2']> extends [string, ...infer R] ? R : never
|
||||
) {
|
||||
return this.client.listObjectsV2(this.name, ...params);
|
||||
return this.client.listObjectsV2(this.bucketName, ...params);
|
||||
}
|
||||
|
||||
// 上传文件
|
||||
putObject(...params: Parameters<Client['putObject']> extends [string, ...infer R] ? R : never) {
|
||||
return this.client.putObject(this.name, ...params);
|
||||
return this.client.putObject(this.bucketName, ...params);
|
||||
}
|
||||
|
||||
getObject(...params: Parameters<Client['getObject']> extends [string, ...infer R] ? R : never) {
|
||||
return this.client.getObject(this.name, ...params);
|
||||
// 获取文件流
|
||||
getFileStream(
|
||||
...params: Parameters<Client['getObject']> extends [string, ...infer R] ? R : never
|
||||
) {
|
||||
return this.client.getObject(this.bucketName, ...params);
|
||||
}
|
||||
|
||||
statObject(...params: Parameters<Client['statObject']> extends [string, ...infer R] ? R : never) {
|
||||
return this.client.statObject(this.name, ...params);
|
||||
// 获取文件状态
|
||||
async statObject(
|
||||
...params: Parameters<Client['statObject']> extends [string, ...infer R] ? R : never
|
||||
) {
|
||||
try {
|
||||
return await this.client.statObject(this.bucketName, ...params);
|
||||
} catch (error) {
|
||||
if (error instanceof S3Error && error.message === 'Not Found') {
|
||||
return null;
|
||||
}
|
||||
return Promise.reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
// 判断文件是否存在
|
||||
async isObjectExists(key: string): Promise<boolean> {
|
||||
try {
|
||||
await this.client.statObject(this.name, key);
|
||||
await this.client.statObject(this.bucketName, key);
|
||||
return true;
|
||||
} catch (err) {
|
||||
if (err instanceof S3Error && err.message === 'Not Found') {
|
||||
|
|
@ -175,6 +186,7 @@ export class S3BaseBucket {
|
|||
}
|
||||
}
|
||||
|
||||
// 将文件流转换为Buffer
|
||||
async fileStreamToBuffer(stream: Readable): Promise<Buffer> {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of stream) {
|
||||
|
|
@ -184,7 +196,7 @@ export class S3BaseBucket {
|
|||
}
|
||||
|
||||
addDeleteJob(params: Omit<Parameters<typeof addS3DelJob>[0], 'bucketName'>) {
|
||||
return addS3DelJob({ ...params, bucketName: this.name });
|
||||
return addS3DelJob({ ...params, bucketName: this.bucketName });
|
||||
}
|
||||
|
||||
async createPostPresignedUrl(
|
||||
|
|
@ -202,7 +214,7 @@ export class S3BaseBucket {
|
|||
|
||||
const policy = this.externalClient.newPostPolicy();
|
||||
policy.setKey(key);
|
||||
policy.setBucket(this.name);
|
||||
policy.setBucket(this.bucketName);
|
||||
policy.setContentType(contentType);
|
||||
if (formatMaxFileSize) {
|
||||
policy.setContentLengthRange(1, formatMaxFileSize);
|
||||
|
|
@ -220,7 +232,7 @@ export class S3BaseBucket {
|
|||
if (expiredHours) {
|
||||
await MongoS3TTL.create({
|
||||
minioKey: key,
|
||||
bucketName: this.name,
|
||||
bucketName: this.bucketName,
|
||||
expiredTime: addHours(new Date(), expiredHours)
|
||||
});
|
||||
}
|
||||
|
|
@ -242,7 +254,7 @@ export class S3BaseBucket {
|
|||
const { key, expiredHours } = parsed;
|
||||
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
|
||||
|
||||
return await this.externalClient.presignedGetObject(this.name, key, expires);
|
||||
return await this.externalClient.presignedGetObject(this.bucketName, key, expires);
|
||||
}
|
||||
|
||||
async createPreviewUrl(params: createPreviewUrlParams) {
|
||||
|
|
@ -251,7 +263,7 @@ export class S3BaseBucket {
|
|||
const { key, expiredHours } = parsed;
|
||||
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
|
||||
|
||||
return await this.client.presignedGetObject(this.name, key, expires);
|
||||
return await this.client.presignedGetObject(this.bucketName, key, expires);
|
||||
}
|
||||
|
||||
async uploadFileByBuffer(params: UploadFileByBufferParams) {
|
||||
|
|
@ -259,7 +271,7 @@ export class S3BaseBucket {
|
|||
|
||||
await MongoS3TTL.create({
|
||||
minioKey: key,
|
||||
bucketName: this.name,
|
||||
bucketName: this.bucketName,
|
||||
expiredTime: addHours(new Date(), 1)
|
||||
});
|
||||
await this.putObject(key, buffer, undefined, {
|
||||
|
|
@ -274,4 +286,22 @@ export class S3BaseBucket {
|
|||
})
|
||||
};
|
||||
}
|
||||
|
||||
// 对外包装的方法
|
||||
// 获取文件元数据
|
||||
async getFileMetadata(key: string) {
|
||||
const stat = await this.statObject(key);
|
||||
if (!stat) return;
|
||||
|
||||
const contentLength = stat.size;
|
||||
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
|
||||
const extension = parseFileExtensionFromUrl(filename);
|
||||
const contentType: string = stat.metaData['content-type'];
|
||||
return {
|
||||
filename,
|
||||
extension,
|
||||
contentType,
|
||||
contentLength
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ export class S3PublicBucket extends S3BaseBucket {
|
|||
super(S3Buckets.public, {
|
||||
...options,
|
||||
afterInit: async () => {
|
||||
const bucket = this.name;
|
||||
const bucket = this.bucketName;
|
||||
const policy = JSON.stringify({
|
||||
Version: '2012-10-17',
|
||||
Statement: [
|
||||
|
|
@ -34,7 +34,7 @@ export class S3PublicBucket extends S3BaseBucket {
|
|||
const protocol = this.options.useSSL ? 'https' : 'http';
|
||||
const hostname = this.options.endPoint;
|
||||
const port = this.options.port;
|
||||
const bucket = this.name;
|
||||
const bucket = this.bucketName;
|
||||
|
||||
const url = new URL(`${protocol}://${hostname}:${port}/${bucket}/${objectKey}`);
|
||||
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@ import { imageBaseUrl } from '@fastgpt/global/common/file/image/constants';
|
|||
import type { ClientSession } from 'mongoose';
|
||||
import { getFileS3Key } from '../utils';
|
||||
|
||||
class S3AvatarSource {
|
||||
private bucket: S3PublicBucket;
|
||||
|
||||
class S3AvatarSource extends S3PublicBucket {
|
||||
constructor() {
|
||||
this.bucket = new S3PublicBucket();
|
||||
super();
|
||||
}
|
||||
|
||||
get prefix(): string {
|
||||
|
|
@ -27,7 +25,7 @@ class S3AvatarSource {
|
|||
}) {
|
||||
const { fileKey } = getFileS3Key.avatar({ teamId, filename });
|
||||
|
||||
return this.bucket.createPostPresignedUrl(
|
||||
return this.createPostPresignedUrl(
|
||||
{ filename, rawKey: fileKey },
|
||||
{
|
||||
expiredHours: autoExpired ? 1 : undefined, // 1 Hours
|
||||
|
|
@ -36,19 +34,15 @@ class S3AvatarSource {
|
|||
);
|
||||
}
|
||||
|
||||
createPublicUrl(objectKey: string): string {
|
||||
return this.bucket.createPublicUrl(objectKey);
|
||||
}
|
||||
|
||||
async removeAvatarTTL(avatar: string, session?: ClientSession): Promise<void> {
|
||||
const key = avatar.slice(this.prefix.length);
|
||||
await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucket.name }, session);
|
||||
await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucketName }, session);
|
||||
}
|
||||
|
||||
async deleteAvatar(avatar: string, session?: ClientSession): Promise<void> {
|
||||
const key = avatar.slice(this.prefix.length);
|
||||
await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucket.name }, session);
|
||||
await this.bucket.delete(key);
|
||||
await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucketName }, session);
|
||||
await this.delete(key);
|
||||
}
|
||||
|
||||
async refreshAvatar(newAvatar?: string, oldAvatar?: string, session?: ClientSession) {
|
||||
|
|
@ -78,7 +72,7 @@ class S3AvatarSource {
|
|||
}) {
|
||||
const from = key.slice(this.prefix.length);
|
||||
const to = `${S3Sources.avatar}/${teamId}/${filename}`;
|
||||
await this.bucket.copy({ from, to, options: { temporary } });
|
||||
await this.copy({ from, to, options: { temporary } });
|
||||
return this.prefix.concat(to);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
|
||||
import { S3PrivateBucket } from '../../buckets/private';
|
||||
import { S3Sources } from '../../type';
|
||||
import {
|
||||
|
|
@ -14,11 +13,9 @@ import { S3Buckets } from '../../constants';
|
|||
import path from 'path';
|
||||
import { getFileS3Key } from '../../utils';
|
||||
|
||||
export class S3ChatSource {
|
||||
private bucket: S3PrivateBucket;
|
||||
|
||||
export class S3ChatSource extends S3PrivateBucket {
|
||||
constructor() {
|
||||
this.bucket = new S3PrivateBucket();
|
||||
super();
|
||||
}
|
||||
|
||||
static parseChatUrl(url: string | URL) {
|
||||
|
|
@ -51,46 +48,19 @@ export class S3ChatSource {
|
|||
}
|
||||
}
|
||||
|
||||
// 获取文件流
|
||||
getChatFileStream(key: string) {
|
||||
return this.bucket.getObject(key);
|
||||
}
|
||||
|
||||
// 获取文件状态
|
||||
getChatFileStat(key: string) {
|
||||
return this.bucket.statObject(key);
|
||||
}
|
||||
|
||||
// 获取文件元数据
|
||||
async getFileMetadata(key: string) {
|
||||
const stat = await this.getChatFileStat(key);
|
||||
if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' };
|
||||
|
||||
const contentLength = stat.size;
|
||||
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
|
||||
const extension = parseFileExtensionFromUrl(filename);
|
||||
const contentType: string = stat.metaData['content-type'];
|
||||
return {
|
||||
filename,
|
||||
extension,
|
||||
contentType,
|
||||
contentLength
|
||||
};
|
||||
}
|
||||
|
||||
async createGetChatFileURL(params: { key: string; expiredHours?: number; external: boolean }) {
|
||||
const { key, expiredHours = 1, external = false } = params; // 默认一个小时
|
||||
|
||||
if (external) {
|
||||
return await this.bucket.createExternalUrl({ key, expiredHours });
|
||||
return await this.createExternalUrl({ key, expiredHours });
|
||||
}
|
||||
return await this.bucket.createPreviewUrl({ key, expiredHours });
|
||||
return await this.createPreviewUrl({ key, expiredHours });
|
||||
}
|
||||
|
||||
async createUploadChatFileURL(params: CheckChatFileKeys) {
|
||||
const { appId, chatId, uId, filename, expiredTime } = ChatFileUploadSchema.parse(params);
|
||||
const { fileKey } = getFileS3Key.chat({ appId, chatId, uId, filename });
|
||||
return await this.bucket.createPostPresignedUrl(
|
||||
return await this.createPostPresignedUrl(
|
||||
{ rawKey: fileKey, filename },
|
||||
{ expiredHours: expiredTime ? differenceInHours(expiredTime, new Date()) : 24 }
|
||||
);
|
||||
|
|
@ -100,11 +70,11 @@ export class S3ChatSource {
|
|||
const { appId, chatId, uId } = DelChatFileByPrefixSchema.parse(params);
|
||||
|
||||
const prefix = [S3Sources.chat, appId, uId, chatId].filter(Boolean).join('/');
|
||||
return this.bucket.addDeleteJob({ prefix });
|
||||
return this.addDeleteJob({ prefix });
|
||||
}
|
||||
|
||||
deleteChatFileByKey(key: string) {
|
||||
return this.bucket.addDeleteJob({ key });
|
||||
return this.addDeleteJob({ key });
|
||||
}
|
||||
|
||||
async uploadChatFileByBuffer(params: UploadFileParams) {
|
||||
|
|
@ -117,7 +87,7 @@ export class S3ChatSource {
|
|||
filename
|
||||
});
|
||||
|
||||
return this.bucket.uploadFileByBuffer({
|
||||
return this.uploadFileByBuffer({
|
||||
key: fileKey,
|
||||
buffer,
|
||||
contentType
|
||||
|
|
|
|||
|
|
@ -2,8 +2,6 @@ import { S3Sources } from '../../type';
|
|||
import { S3PrivateBucket } from '../../buckets/private';
|
||||
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
|
||||
import {
|
||||
type AddRawTextBufferParams,
|
||||
AddRawTextBufferParamsSchema,
|
||||
type CreateGetDatasetFileURLParams,
|
||||
CreateGetDatasetFileURLParamsSchema,
|
||||
type CreateUploadDatasetFileParams,
|
||||
|
|
@ -12,26 +10,26 @@ import {
|
|||
DeleteDatasetFilesByPrefixParamsSchema,
|
||||
type GetDatasetFileContentParams,
|
||||
GetDatasetFileContentParamsSchema,
|
||||
type GetRawTextBufferParams,
|
||||
type UploadParams,
|
||||
UploadParamsSchema
|
||||
} from './type';
|
||||
import { MongoS3TTL } from '../../schema';
|
||||
import { addHours, addMinutes } from 'date-fns';
|
||||
import { addHours } from 'date-fns';
|
||||
import { addLog } from '../../../system/log';
|
||||
import { detectFileEncoding } from '@fastgpt/global/common/file/tools';
|
||||
import { readS3FileContentByBuffer } from '../../../file/read/utils';
|
||||
import path from 'node:path';
|
||||
import { Mimes } from '../../constants';
|
||||
import { getFileS3Key, truncateFilename } from '../../utils';
|
||||
import { createHash } from 'node:crypto';
|
||||
import { S3Error } from 'minio';
|
||||
import type { S3RawTextSource } from '../rawText';
|
||||
import { getS3RawTextSource } from '../rawText';
|
||||
|
||||
export class S3DatasetSource {
|
||||
public bucket: S3PrivateBucket;
|
||||
export class S3DatasetSource extends S3PrivateBucket {
|
||||
private rawTextSource: S3RawTextSource;
|
||||
|
||||
constructor() {
|
||||
this.bucket = new S3PrivateBucket();
|
||||
super();
|
||||
this.rawTextSource = getS3RawTextSource();
|
||||
}
|
||||
|
||||
// 下载链接
|
||||
|
|
@ -39,19 +37,26 @@ export class S3DatasetSource {
|
|||
const { key, expiredHours, external } = CreateGetDatasetFileURLParamsSchema.parse(params);
|
||||
|
||||
if (external) {
|
||||
return await this.bucket.createExternalUrl({ key, expiredHours });
|
||||
return await this.createExternalUrl({ key, expiredHours });
|
||||
}
|
||||
return await this.bucket.createPreviewUrl({ key, expiredHours });
|
||||
return await this.createPreviewUrl({ key, expiredHours });
|
||||
}
|
||||
|
||||
// 上传链接
|
||||
async createUploadDatasetFileURL(params: CreateUploadDatasetFileParams) {
|
||||
const { filename, datasetId } = CreateUploadDatasetFileParamsSchema.parse(params);
|
||||
const { fileKey } = getFileS3Key.dataset({ datasetId, filename });
|
||||
return await this.bucket.createPostPresignedUrl(
|
||||
{ rawKey: fileKey, filename },
|
||||
{ expiredHours: 3 }
|
||||
);
|
||||
return await this.createPostPresignedUrl({ rawKey: fileKey, filename }, { expiredHours: 3 });
|
||||
}
|
||||
|
||||
// 单个键删除
|
||||
deleteDatasetFileByKey(key?: string) {
|
||||
return this.addDeleteJob({ key });
|
||||
}
|
||||
|
||||
// 多个键删除
|
||||
deleteDatasetFilesByKeys(keys: string[]) {
|
||||
return this.addDeleteJob({ keys });
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -62,68 +67,27 @@ export class S3DatasetSource {
|
|||
deleteDatasetFilesByPrefix(params: DeleteDatasetFilesByPrefixParams) {
|
||||
const { datasetId } = DeleteDatasetFilesByPrefixParamsSchema.parse(params);
|
||||
const prefix = [S3Sources.dataset, datasetId].filter(Boolean).join('/');
|
||||
return this.bucket.addDeleteJob({ prefix });
|
||||
}
|
||||
|
||||
// 单个键删除
|
||||
deleteDatasetFileByKey(key?: string) {
|
||||
return this.bucket.addDeleteJob({ key });
|
||||
}
|
||||
|
||||
// 多个键删除
|
||||
deleteDatasetFilesByKeys(keys: string[]) {
|
||||
return this.bucket.addDeleteJob({ keys });
|
||||
}
|
||||
|
||||
// 获取文件流
|
||||
getDatasetFileStream(key: string) {
|
||||
return this.bucket.getObject(key);
|
||||
}
|
||||
|
||||
// 获取文件状态
|
||||
getDatasetFileStat(key: string) {
|
||||
try {
|
||||
return this.bucket.statObject(key);
|
||||
} catch (error) {
|
||||
if (error instanceof S3Error && error.message === 'Not Found') {
|
||||
return null;
|
||||
}
|
||||
return Promise.reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
// 获取文件元数据
|
||||
async getFileMetadata(key: string) {
|
||||
const stat = await this.getDatasetFileStat(key);
|
||||
if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' };
|
||||
|
||||
const contentLength = stat.size;
|
||||
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
|
||||
const extension = parseFileExtensionFromUrl(filename);
|
||||
const contentType: string = stat.metaData['content-type'];
|
||||
return {
|
||||
filename,
|
||||
extension,
|
||||
contentType,
|
||||
contentLength
|
||||
};
|
||||
return this.addDeleteJob({ prefix });
|
||||
}
|
||||
|
||||
async getDatasetBase64Image(key: string): Promise<string> {
|
||||
const [stream, metadata] = await Promise.all([
|
||||
this.getDatasetFileStream(key),
|
||||
this.getFileStream(key),
|
||||
this.getFileMetadata(key)
|
||||
]);
|
||||
const buffer = await this.bucket.fileStreamToBuffer(stream);
|
||||
const buffer = await this.fileStreamToBuffer(stream);
|
||||
const base64 = buffer.toString('base64');
|
||||
return `data:${metadata.contentType || 'image/jpeg'};base64,${base64}`;
|
||||
return `data:${metadata?.contentType || 'image/jpeg'};base64,${base64}`;
|
||||
}
|
||||
|
||||
async getDatasetFileRawText(params: GetDatasetFileContentParams) {
|
||||
const { fileId, teamId, tmbId, customPdfParse, getFormatText, usageId } =
|
||||
GetDatasetFileContentParamsSchema.parse(params);
|
||||
|
||||
const rawTextBuffer = await this.getRawTextBuffer({ customPdfParse, sourceId: fileId });
|
||||
const rawTextBuffer = await this.rawTextSource.getRawTextBuffer({
|
||||
customPdfParse,
|
||||
sourceId: fileId
|
||||
});
|
||||
if (rawTextBuffer) {
|
||||
return {
|
||||
rawText: rawTextBuffer.text,
|
||||
|
|
@ -133,14 +97,14 @@ export class S3DatasetSource {
|
|||
|
||||
const [metadata, stream] = await Promise.all([
|
||||
this.getFileMetadata(fileId),
|
||||
this.getDatasetFileStream(fileId)
|
||||
this.getFileStream(fileId)
|
||||
]);
|
||||
|
||||
const extension = metadata.extension;
|
||||
const filename: string = decodeURIComponent(metadata.filename);
|
||||
const extension = metadata?.extension || '';
|
||||
const filename: string = decodeURIComponent(metadata?.filename || '');
|
||||
|
||||
const start = Date.now();
|
||||
const buffer = await this.bucket.fileStreamToBuffer(stream);
|
||||
const buffer = await this.fileStreamToBuffer(stream);
|
||||
addLog.debug('get dataset file buffer', { time: Date.now() - start });
|
||||
|
||||
const encoding = detectFileEncoding(buffer);
|
||||
|
|
@ -159,7 +123,7 @@ export class S3DatasetSource {
|
|||
}
|
||||
});
|
||||
|
||||
this.addRawTextBuffer({
|
||||
this.rawTextSource.addRawTextBuffer({
|
||||
sourceId: fileId,
|
||||
sourceName: filename,
|
||||
text: rawText,
|
||||
|
|
@ -195,11 +159,11 @@ export class S3DatasetSource {
|
|||
|
||||
await MongoS3TTL.create({
|
||||
minioKey: key,
|
||||
bucketName: this.bucket.name,
|
||||
bucketName: this.bucketName,
|
||||
expiredTime: addHours(new Date(), 3)
|
||||
});
|
||||
|
||||
await this.bucket.putObject(key, stream, size, {
|
||||
await this.putObject(key, stream, size, {
|
||||
'content-type': Mimes[path.extname(truncatedFilename) as keyof typeof Mimes],
|
||||
'upload-time': new Date().toISOString(),
|
||||
'origin-filename': encodeURIComponent(truncatedFilename)
|
||||
|
|
@ -207,51 +171,6 @@ export class S3DatasetSource {
|
|||
|
||||
return key;
|
||||
}
|
||||
|
||||
async addRawTextBuffer(params: AddRawTextBufferParams) {
|
||||
const { sourceId, sourceName, text, customPdfParse } =
|
||||
AddRawTextBufferParamsSchema.parse(params);
|
||||
|
||||
// 因为 Key 唯一对应一个 Object 所以不需要根据文件内容计算 Hash 直接用 Key 计算 Hash 就行了
|
||||
const hash = createHash('md5').update(sourceId).digest('hex');
|
||||
const key = getFileS3Key.rawText({ hash, customPdfParse });
|
||||
|
||||
await MongoS3TTL.create({
|
||||
minioKey: key,
|
||||
bucketName: this.bucket.name,
|
||||
expiredTime: addMinutes(new Date(), 20)
|
||||
});
|
||||
|
||||
const buffer = Buffer.from(text);
|
||||
await this.bucket.putObject(key, buffer, buffer.length, {
|
||||
'content-type': 'text/plain',
|
||||
'origin-filename': encodeURIComponent(sourceName),
|
||||
'upload-time': new Date().toISOString()
|
||||
});
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
async getRawTextBuffer(params: GetRawTextBufferParams) {
|
||||
const { customPdfParse, sourceId } = params;
|
||||
|
||||
const hash = createHash('md5').update(sourceId).digest('hex');
|
||||
const key = getFileS3Key.rawText({ hash, customPdfParse });
|
||||
|
||||
if (!(await this.bucket.isObjectExists(key))) return null;
|
||||
|
||||
const [stream, metadata] = await Promise.all([
|
||||
this.bucket.getObject(key),
|
||||
this.getFileMetadata(key)
|
||||
]);
|
||||
|
||||
const buffer = await this.bucket.fileStreamToBuffer(stream);
|
||||
|
||||
return {
|
||||
text: buffer.toString('utf-8'),
|
||||
filename: metadata.filename
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function getS3DatasetSource() {
|
||||
|
|
|
|||
|
|
@ -59,12 +59,3 @@ export const UploadParamsSchema = z.union([
|
|||
})
|
||||
]);
|
||||
export type UploadParams = z.input<typeof UploadParamsSchema>;
|
||||
|
||||
export const AddRawTextBufferParamsSchema = z.object({
|
||||
customPdfParse: z.boolean().optional(),
|
||||
sourceId: z.string().nonempty(),
|
||||
sourceName: z.string().nonempty(),
|
||||
text: z.string()
|
||||
});
|
||||
export type AddRawTextBufferParams = z.input<typeof AddRawTextBufferParamsSchema>;
|
||||
export type GetRawTextBufferParams = Pick<AddRawTextBufferParams, 'customPdfParse' | 'sourceId'>;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
import { S3PrivateBucket } from '../../buckets/private';
|
||||
import {
|
||||
type AddRawTextBufferParams,
|
||||
AddRawTextBufferParamsSchema,
|
||||
type GetRawTextBufferParams
|
||||
} from './type';
|
||||
import { MongoS3TTL } from '../../schema';
|
||||
import { addMinutes } from 'date-fns';
|
||||
import { getFileS3Key } from '../../utils';
|
||||
import { createHash } from 'node:crypto';
|
||||
|
||||
export class S3RawTextSource extends S3PrivateBucket {
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
// 获取文件元数据
|
||||
async getFilename(key: string) {
|
||||
const stat = await this.statObject(key);
|
||||
if (!stat) return '';
|
||||
|
||||
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
|
||||
return filename;
|
||||
}
|
||||
|
||||
async addRawTextBuffer(params: AddRawTextBufferParams) {
|
||||
const { sourceId, sourceName, text, customPdfParse } =
|
||||
AddRawTextBufferParamsSchema.parse(params);
|
||||
|
||||
// 因为 Key 唯一对应一个 Object 所以不需要根据文件内容计算 Hash 直接用 Key 计算 Hash 就行了
|
||||
const hash = createHash('md5').update(sourceId).digest('hex');
|
||||
const key = getFileS3Key.rawText({ hash, customPdfParse });
|
||||
const buffer = Buffer.from(text);
|
||||
|
||||
await MongoS3TTL.create({
|
||||
minioKey: key,
|
||||
bucketName: this.bucketName,
|
||||
expiredTime: addMinutes(new Date(), 20)
|
||||
});
|
||||
|
||||
await this.putObject(key, buffer, buffer.length, {
|
||||
'content-type': 'text/plain',
|
||||
'origin-filename': encodeURIComponent(sourceName),
|
||||
'upload-time': new Date().toISOString()
|
||||
});
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
async getRawTextBuffer(params: GetRawTextBufferParams) {
|
||||
const { customPdfParse, sourceId } = params;
|
||||
|
||||
const hash = createHash('md5').update(sourceId).digest('hex');
|
||||
const key = getFileS3Key.rawText({ hash, customPdfParse });
|
||||
|
||||
if (!(await this.isObjectExists(key))) return null;
|
||||
|
||||
const [stream, filename] = await Promise.all([this.getFileStream(key), this.getFilename(key)]);
|
||||
|
||||
const buffer = await this.fileStreamToBuffer(stream);
|
||||
|
||||
return {
|
||||
text: buffer.toString('utf-8'),
|
||||
filename
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function getS3RawTextSource() {
|
||||
if (global.rawTextBucket) {
|
||||
return global.rawTextBucket;
|
||||
}
|
||||
global.rawTextBucket = new S3RawTextSource();
|
||||
return global.rawTextBucket;
|
||||
}
|
||||
|
||||
declare global {
|
||||
var rawTextBucket: S3RawTextSource;
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
import z from 'zod';
|
||||
|
||||
export const AddRawTextBufferParamsSchema = z.object({
|
||||
customPdfParse: z.boolean().optional(),
|
||||
sourceId: z.string().nonempty(),
|
||||
sourceName: z.string().nonempty(),
|
||||
text: z.string()
|
||||
});
|
||||
export type AddRawTextBufferParams = z.input<typeof AddRawTextBufferParamsSchema>;
|
||||
export type GetRawTextBufferParams = Pick<AddRawTextBufferParams, 'customPdfParse' | 'sourceId'>;
|
||||
|
|
@ -9,7 +9,7 @@ import { addLog } from '../../../../common/system/log';
|
|||
import { readFileRawTextByUrl } from '../../read';
|
||||
import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type';
|
||||
import { type RequireOnlyOne } from '@fastgpt/global/common/type/utils';
|
||||
import { getS3DatasetSource } from '../../../../common/s3/sources/dataset';
|
||||
import { getS3RawTextSource } from '../../../../common/s3/sources/rawText';
|
||||
|
||||
type ResponseDataType = {
|
||||
success: boolean;
|
||||
|
|
@ -154,7 +154,7 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer }
|
|||
}
|
||||
if (previewUrl) {
|
||||
// Get from buffer
|
||||
const rawTextBuffer = await getS3DatasetSource().getRawTextBuffer({
|
||||
const rawTextBuffer = await getS3RawTextSource().getRawTextBuffer({
|
||||
sourceId: previewUrl,
|
||||
customPdfParse
|
||||
});
|
||||
|
|
@ -175,7 +175,7 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer }
|
|||
getFormatText: true
|
||||
});
|
||||
|
||||
getS3DatasetSource().addRawTextBuffer({
|
||||
getS3RawTextSource().addRawTextBuffer({
|
||||
sourceId: previewUrl,
|
||||
sourceName: title || '',
|
||||
text: rawText,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import { S3ChatSource } from '../../../../common/s3/sources/chat';
|
|||
import path from 'node:path';
|
||||
import { S3Buckets } from '../../../../common/s3/constants';
|
||||
import { S3Sources } from '../../../../common/s3/type';
|
||||
import { getS3DatasetSource } from '../../../../common/s3/sources/dataset';
|
||||
import { getS3RawTextSource } from '../../../../common/s3/sources/rawText';
|
||||
|
||||
type Props = ModuleDispatchProps<{
|
||||
[NodeInputKeyEnum.fileUrlList]: string[];
|
||||
|
|
@ -175,17 +175,17 @@ export const getFileContentFromLinks = async ({
|
|||
parseUrlList
|
||||
.map(async (url) => {
|
||||
// Get from buffer
|
||||
const rawTextBuffer = await getS3DatasetSource().getRawTextBuffer({
|
||||
const rawTextBuffer = await getS3RawTextSource().getRawTextBuffer({
|
||||
sourceId: url,
|
||||
customPdfParse
|
||||
});
|
||||
// if (rawTextBuffer) {
|
||||
// return formatResponseObject({
|
||||
// filename: rawTextBuffer.filename || url,
|
||||
// url,
|
||||
// content: rawTextBuffer.text
|
||||
// });
|
||||
// }
|
||||
if (rawTextBuffer) {
|
||||
return formatResponseObject({
|
||||
filename: rawTextBuffer.filename || url,
|
||||
url,
|
||||
content: rawTextBuffer.text
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
if (isInternalAddress(url)) {
|
||||
|
|
@ -285,7 +285,7 @@ export const getFileContentFromLinks = async ({
|
|||
const replacedText = replaceS3KeyToPreviewUrl(rawText, addDays(new Date(), 90));
|
||||
|
||||
// Add to buffer
|
||||
getS3DatasetSource().addRawTextBuffer({
|
||||
getS3RawTextSource().addRawTextBuffer({
|
||||
sourceId: url,
|
||||
sourceName: filename,
|
||||
text: replacedText,
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ export const authCollectionFile = async ({
|
|||
const authRes = await parseHeaderCert(props);
|
||||
|
||||
if (isS3ObjectKey(fileId, 'dataset')) {
|
||||
const stat = await getS3DatasetSource().getDatasetFileStat(fileId);
|
||||
if (!stat) return Promise.reject(CommonErrEnum.fileNotFound);
|
||||
const exists = await getS3DatasetSource().isObjectExists(fileId);
|
||||
if (!exists) return Promise.reject(CommonErrEnum.fileNotFound);
|
||||
} else {
|
||||
return Promise.reject('Invalid dataset file key');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
|
|||
|
||||
const [file, fileStream] = await Promise.all([
|
||||
getS3DatasetSource().getFileMetadata(fileId),
|
||||
getS3DatasetSource().getDatasetFileStream(fileId)
|
||||
getS3DatasetSource().getFileStream(fileId)
|
||||
]);
|
||||
|
||||
if (!file) {
|
||||
|
|
|
|||
|
|
@ -82,7 +82,10 @@ async function handler(
|
|||
|
||||
const s3ImageIds = imageIds.filter((id) => isS3ObjectKey(id, 'dataset'));
|
||||
for (const id of s3ImageIds) {
|
||||
imageSizeMap.set(id, (await getS3DatasetSource().getFileMetadata(id)).contentLength);
|
||||
const metadata = await getS3DatasetSource().getFileMetadata(id);
|
||||
if (metadata) {
|
||||
imageSizeMap.set(id, metadata.contentLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,21 +20,23 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
|
|||
(() => {
|
||||
if (isS3ObjectKey(objectKey, 'dataset')) {
|
||||
return [
|
||||
s3DatasetSource.getDatasetFileStream(objectKey),
|
||||
s3DatasetSource.getFileStream(objectKey),
|
||||
s3DatasetSource.getFileMetadata(objectKey)
|
||||
];
|
||||
} else {
|
||||
return [
|
||||
s3ChatSource.getChatFileStream(objectKey),
|
||||
s3ChatSource.getFileStream(objectKey),
|
||||
s3ChatSource.getFileMetadata(objectKey)
|
||||
];
|
||||
}
|
||||
})()
|
||||
);
|
||||
|
||||
res.setHeader('Content-Type', metadata.contentType);
|
||||
if (metadata) {
|
||||
res.setHeader('Content-Type', metadata.contentType);
|
||||
res.setHeader('Content-Length', metadata.contentLength);
|
||||
}
|
||||
res.setHeader('Cache-Control', 'public, max-age=31536000');
|
||||
res.setHeader('Content-Length', metadata.contentLength);
|
||||
|
||||
stream.pipe(res);
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ const createMockS3Bucket = () => ({
|
|||
exist: vi.fn().mockResolvedValue(true),
|
||||
delete: vi.fn().mockResolvedValue(undefined),
|
||||
putObject: vi.fn().mockResolvedValue(undefined),
|
||||
getObject: vi.fn().mockResolvedValue(null),
|
||||
getFileStream: vi.fn().mockResolvedValue(null),
|
||||
statObject: vi.fn().mockResolvedValue({ size: 0, etag: 'mock-etag' }),
|
||||
move: vi.fn().mockResolvedValue(undefined),
|
||||
copy: vi.fn().mockResolvedValue(undefined),
|
||||
|
|
@ -39,7 +39,7 @@ const createMockMinioClient = vi.hoisted(() => {
|
|||
copyObject: vi.fn().mockResolvedValue(undefined),
|
||||
removeObject: vi.fn().mockResolvedValue(undefined),
|
||||
putObject: vi.fn().mockResolvedValue({ etag: 'mock-etag' }),
|
||||
getObject: vi.fn().mockResolvedValue(null),
|
||||
getFileStream: vi.fn().mockResolvedValue(null),
|
||||
statObject: vi.fn().mockResolvedValue({ size: 0, etag: 'mock-etag' }),
|
||||
presignedGetObject: vi.fn().mockResolvedValue('http://localhost:9000/mock-bucket/mock-object'),
|
||||
presignedPostPolicy: vi.fn().mockResolvedValue({
|
||||
|
|
@ -81,7 +81,7 @@ const createMockBucketClass = (defaultName: string) => {
|
|||
}
|
||||
async delete() {}
|
||||
async putObject() {}
|
||||
async getObject() {
|
||||
async getFileStream() {
|
||||
return null;
|
||||
}
|
||||
async statObject() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue