diff --git a/packages/service/common/s3/buckets/base.ts b/packages/service/common/s3/buckets/base.ts index 34f44c3e4..9af8020cd 100644 --- a/packages/service/common/s3/buckets/base.ts +++ b/packages/service/common/s3/buckets/base.ts @@ -15,6 +15,7 @@ import { addLog } from '../../system/log'; import { addS3DelJob } from '../mq'; import { type Readable } from 'node:stream'; import { type UploadFileByBufferParams, UploadFileByBufferSchema } from '../type'; +import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; export class S3BaseBucket { private _client: Client; @@ -26,7 +27,7 @@ export class S3BaseBucket { * @param options the options for the s3 client */ constructor( - private readonly bucketName: string, + public readonly bucketName: string, public options: Partial = defaultS3Options ) { options = { ...defaultS3Options, ...options }; @@ -56,20 +57,18 @@ export class S3BaseBucket { } const init = async () => { - if (!(await this.exist())) { + // Not exists bucket, create it + if (!(await this.client.bucketExists(this.bucketName))) { await this.client.makeBucket(this.bucketName); } await this.options.afterInit?.(); - console.log(`S3 init success: ${this.name}`); + console.log(`S3 init success: ${this.bucketName}`); }; if (this.options.init) { init(); } } - get name(): string { - return this.bucketName; - } get client(): Client { return this._client; } @@ -110,21 +109,17 @@ export class S3BaseBucket { copyConditions?: CopyConditions; }; }): ReturnType { - const bucket = this.name; + const bucket = this.bucketName; if (options?.temporary) { await MongoS3TTL.create({ minioKey: to, - bucketName: this.name, + bucketName: this.bucketName, expiredTime: addHours(new Date(), 24) }); } return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions); } - exist(): Promise { - return this.client.bucketExists(this.name); - } - async delete(objectKey: string, options?: RemoveOptions): Promise { try { if (!objectKey) return Promise.resolve(); @@ -133,11 +128,11 @@ export class S3BaseBucket { const fileParsedPrefix = `${path.dirname(objectKey)}/${path.basename(objectKey, path.extname(objectKey))}-parsed`; await this.addDeleteJob({ prefix: fileParsedPrefix }); - return await this.client.removeObject(this.name, objectKey, options); + return await this.client.removeObject(this.bucketName, objectKey, options); } catch (error) { if (error instanceof S3Error) { if (error.code === 'InvalidObjectName') { - addLog.warn(`${this.name} delete object not found: ${objectKey}`, error); + addLog.warn(`${this.bucketName} delete object not found: ${objectKey}`, error); return Promise.resolve(); } } @@ -145,27 +140,43 @@ export class S3BaseBucket { } } + // 列出文件 listObjectsV2( ...params: Parameters extends [string, ...infer R] ? R : never ) { - return this.client.listObjectsV2(this.name, ...params); + return this.client.listObjectsV2(this.bucketName, ...params); } + // 上传文件 putObject(...params: Parameters extends [string, ...infer R] ? R : never) { - return this.client.putObject(this.name, ...params); + return this.client.putObject(this.bucketName, ...params); } - getObject(...params: Parameters extends [string, ...infer R] ? R : never) { - return this.client.getObject(this.name, ...params); + // 获取文件流 + getFileStream( + ...params: Parameters extends [string, ...infer R] ? R : never + ) { + return this.client.getObject(this.bucketName, ...params); } - statObject(...params: Parameters extends [string, ...infer R] ? R : never) { - return this.client.statObject(this.name, ...params); + // 获取文件状态 + async statObject( + ...params: Parameters extends [string, ...infer R] ? R : never + ) { + try { + return await this.client.statObject(this.bucketName, ...params); + } catch (error) { + if (error instanceof S3Error && error.message === 'Not Found') { + return null; + } + return Promise.reject(error); + } } + // 判断文件是否存在 async isObjectExists(key: string): Promise { try { - await this.client.statObject(this.name, key); + await this.client.statObject(this.bucketName, key); return true; } catch (err) { if (err instanceof S3Error && err.message === 'Not Found') { @@ -175,6 +186,7 @@ export class S3BaseBucket { } } + // 将文件流转换为Buffer async fileStreamToBuffer(stream: Readable): Promise { const chunks: Buffer[] = []; for await (const chunk of stream) { @@ -184,7 +196,7 @@ export class S3BaseBucket { } addDeleteJob(params: Omit[0], 'bucketName'>) { - return addS3DelJob({ ...params, bucketName: this.name }); + return addS3DelJob({ ...params, bucketName: this.bucketName }); } async createPostPresignedUrl( @@ -202,7 +214,7 @@ export class S3BaseBucket { const policy = this.externalClient.newPostPolicy(); policy.setKey(key); - policy.setBucket(this.name); + policy.setBucket(this.bucketName); policy.setContentType(contentType); if (formatMaxFileSize) { policy.setContentLengthRange(1, formatMaxFileSize); @@ -220,7 +232,7 @@ export class S3BaseBucket { if (expiredHours) { await MongoS3TTL.create({ minioKey: key, - bucketName: this.name, + bucketName: this.bucketName, expiredTime: addHours(new Date(), expiredHours) }); } @@ -242,7 +254,7 @@ export class S3BaseBucket { const { key, expiredHours } = parsed; const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟 - return await this.externalClient.presignedGetObject(this.name, key, expires); + return await this.externalClient.presignedGetObject(this.bucketName, key, expires); } async createPreviewUrl(params: createPreviewUrlParams) { @@ -251,7 +263,7 @@ export class S3BaseBucket { const { key, expiredHours } = parsed; const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟 - return await this.client.presignedGetObject(this.name, key, expires); + return await this.client.presignedGetObject(this.bucketName, key, expires); } async uploadFileByBuffer(params: UploadFileByBufferParams) { @@ -259,7 +271,7 @@ export class S3BaseBucket { await MongoS3TTL.create({ minioKey: key, - bucketName: this.name, + bucketName: this.bucketName, expiredTime: addHours(new Date(), 1) }); await this.putObject(key, buffer, undefined, { @@ -274,4 +286,22 @@ export class S3BaseBucket { }) }; } + + // 对外包装的方法 + // 获取文件元数据 + async getFileMetadata(key: string) { + const stat = await this.statObject(key); + if (!stat) return; + + const contentLength = stat.size; + const filename: string = decodeURIComponent(stat.metaData['origin-filename']); + const extension = parseFileExtensionFromUrl(filename); + const contentType: string = stat.metaData['content-type']; + return { + filename, + extension, + contentType, + contentLength + }; + } } diff --git a/packages/service/common/s3/buckets/public.ts b/packages/service/common/s3/buckets/public.ts index 99a8b4cc9..331c79277 100644 --- a/packages/service/common/s3/buckets/public.ts +++ b/packages/service/common/s3/buckets/public.ts @@ -7,7 +7,7 @@ export class S3PublicBucket extends S3BaseBucket { super(S3Buckets.public, { ...options, afterInit: async () => { - const bucket = this.name; + const bucket = this.bucketName; const policy = JSON.stringify({ Version: '2012-10-17', Statement: [ @@ -34,7 +34,7 @@ export class S3PublicBucket extends S3BaseBucket { const protocol = this.options.useSSL ? 'https' : 'http'; const hostname = this.options.endPoint; const port = this.options.port; - const bucket = this.name; + const bucket = this.bucketName; const url = new URL(`${protocol}://${hostname}:${port}/${bucket}/${objectKey}`); diff --git a/packages/service/common/s3/sources/avatar.ts b/packages/service/common/s3/sources/avatar.ts index 6661d504e..0d9ef031d 100644 --- a/packages/service/common/s3/sources/avatar.ts +++ b/packages/service/common/s3/sources/avatar.ts @@ -5,11 +5,9 @@ import { imageBaseUrl } from '@fastgpt/global/common/file/image/constants'; import type { ClientSession } from 'mongoose'; import { getFileS3Key } from '../utils'; -class S3AvatarSource { - private bucket: S3PublicBucket; - +class S3AvatarSource extends S3PublicBucket { constructor() { - this.bucket = new S3PublicBucket(); + super(); } get prefix(): string { @@ -27,7 +25,7 @@ class S3AvatarSource { }) { const { fileKey } = getFileS3Key.avatar({ teamId, filename }); - return this.bucket.createPostPresignedUrl( + return this.createPostPresignedUrl( { filename, rawKey: fileKey }, { expiredHours: autoExpired ? 1 : undefined, // 1 Hours @@ -36,19 +34,15 @@ class S3AvatarSource { ); } - createPublicUrl(objectKey: string): string { - return this.bucket.createPublicUrl(objectKey); - } - async removeAvatarTTL(avatar: string, session?: ClientSession): Promise { const key = avatar.slice(this.prefix.length); - await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucket.name }, session); + await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucketName }, session); } async deleteAvatar(avatar: string, session?: ClientSession): Promise { const key = avatar.slice(this.prefix.length); - await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucket.name }, session); - await this.bucket.delete(key); + await MongoS3TTL.deleteOne({ minioKey: key, bucketName: this.bucketName }, session); + await this.delete(key); } async refreshAvatar(newAvatar?: string, oldAvatar?: string, session?: ClientSession) { @@ -78,7 +72,7 @@ class S3AvatarSource { }) { const from = key.slice(this.prefix.length); const to = `${S3Sources.avatar}/${teamId}/${filename}`; - await this.bucket.copy({ from, to, options: { temporary } }); + await this.copy({ from, to, options: { temporary } }); return this.prefix.concat(to); } } diff --git a/packages/service/common/s3/sources/chat/index.ts b/packages/service/common/s3/sources/chat/index.ts index 22463baf2..087c8f3fd 100644 --- a/packages/service/common/s3/sources/chat/index.ts +++ b/packages/service/common/s3/sources/chat/index.ts @@ -1,4 +1,3 @@ -import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { S3PrivateBucket } from '../../buckets/private'; import { S3Sources } from '../../type'; import { @@ -14,11 +13,9 @@ import { S3Buckets } from '../../constants'; import path from 'path'; import { getFileS3Key } from '../../utils'; -export class S3ChatSource { - private bucket: S3PrivateBucket; - +export class S3ChatSource extends S3PrivateBucket { constructor() { - this.bucket = new S3PrivateBucket(); + super(); } static parseChatUrl(url: string | URL) { @@ -51,46 +48,19 @@ export class S3ChatSource { } } - // 获取文件流 - getChatFileStream(key: string) { - return this.bucket.getObject(key); - } - - // 获取文件状态 - getChatFileStat(key: string) { - return this.bucket.statObject(key); - } - - // 获取文件元数据 - async getFileMetadata(key: string) { - const stat = await this.getChatFileStat(key); - if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' }; - - const contentLength = stat.size; - const filename: string = decodeURIComponent(stat.metaData['origin-filename']); - const extension = parseFileExtensionFromUrl(filename); - const contentType: string = stat.metaData['content-type']; - return { - filename, - extension, - contentType, - contentLength - }; - } - async createGetChatFileURL(params: { key: string; expiredHours?: number; external: boolean }) { const { key, expiredHours = 1, external = false } = params; // 默认一个小时 if (external) { - return await this.bucket.createExternalUrl({ key, expiredHours }); + return await this.createExternalUrl({ key, expiredHours }); } - return await this.bucket.createPreviewUrl({ key, expiredHours }); + return await this.createPreviewUrl({ key, expiredHours }); } async createUploadChatFileURL(params: CheckChatFileKeys) { const { appId, chatId, uId, filename, expiredTime } = ChatFileUploadSchema.parse(params); const { fileKey } = getFileS3Key.chat({ appId, chatId, uId, filename }); - return await this.bucket.createPostPresignedUrl( + return await this.createPostPresignedUrl( { rawKey: fileKey, filename }, { expiredHours: expiredTime ? differenceInHours(expiredTime, new Date()) : 24 } ); @@ -100,11 +70,11 @@ export class S3ChatSource { const { appId, chatId, uId } = DelChatFileByPrefixSchema.parse(params); const prefix = [S3Sources.chat, appId, uId, chatId].filter(Boolean).join('/'); - return this.bucket.addDeleteJob({ prefix }); + return this.addDeleteJob({ prefix }); } deleteChatFileByKey(key: string) { - return this.bucket.addDeleteJob({ key }); + return this.addDeleteJob({ key }); } async uploadChatFileByBuffer(params: UploadFileParams) { @@ -117,7 +87,7 @@ export class S3ChatSource { filename }); - return this.bucket.uploadFileByBuffer({ + return this.uploadFileByBuffer({ key: fileKey, buffer, contentType diff --git a/packages/service/common/s3/sources/dataset/index.ts b/packages/service/common/s3/sources/dataset/index.ts index 2acd2cd7e..c9ca9d6aa 100644 --- a/packages/service/common/s3/sources/dataset/index.ts +++ b/packages/service/common/s3/sources/dataset/index.ts @@ -2,8 +2,6 @@ import { S3Sources } from '../../type'; import { S3PrivateBucket } from '../../buckets/private'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { - type AddRawTextBufferParams, - AddRawTextBufferParamsSchema, type CreateGetDatasetFileURLParams, CreateGetDatasetFileURLParamsSchema, type CreateUploadDatasetFileParams, @@ -12,26 +10,26 @@ import { DeleteDatasetFilesByPrefixParamsSchema, type GetDatasetFileContentParams, GetDatasetFileContentParamsSchema, - type GetRawTextBufferParams, type UploadParams, UploadParamsSchema } from './type'; import { MongoS3TTL } from '../../schema'; -import { addHours, addMinutes } from 'date-fns'; +import { addHours } from 'date-fns'; import { addLog } from '../../../system/log'; import { detectFileEncoding } from '@fastgpt/global/common/file/tools'; import { readS3FileContentByBuffer } from '../../../file/read/utils'; import path from 'node:path'; import { Mimes } from '../../constants'; import { getFileS3Key, truncateFilename } from '../../utils'; -import { createHash } from 'node:crypto'; -import { S3Error } from 'minio'; +import type { S3RawTextSource } from '../rawText'; +import { getS3RawTextSource } from '../rawText'; -export class S3DatasetSource { - public bucket: S3PrivateBucket; +export class S3DatasetSource extends S3PrivateBucket { + private rawTextSource: S3RawTextSource; constructor() { - this.bucket = new S3PrivateBucket(); + super(); + this.rawTextSource = getS3RawTextSource(); } // 下载链接 @@ -39,19 +37,26 @@ export class S3DatasetSource { const { key, expiredHours, external } = CreateGetDatasetFileURLParamsSchema.parse(params); if (external) { - return await this.bucket.createExternalUrl({ key, expiredHours }); + return await this.createExternalUrl({ key, expiredHours }); } - return await this.bucket.createPreviewUrl({ key, expiredHours }); + return await this.createPreviewUrl({ key, expiredHours }); } // 上传链接 async createUploadDatasetFileURL(params: CreateUploadDatasetFileParams) { const { filename, datasetId } = CreateUploadDatasetFileParamsSchema.parse(params); const { fileKey } = getFileS3Key.dataset({ datasetId, filename }); - return await this.bucket.createPostPresignedUrl( - { rawKey: fileKey, filename }, - { expiredHours: 3 } - ); + return await this.createPostPresignedUrl({ rawKey: fileKey, filename }, { expiredHours: 3 }); + } + + // 单个键删除 + deleteDatasetFileByKey(key?: string) { + return this.addDeleteJob({ key }); + } + + // 多个键删除 + deleteDatasetFilesByKeys(keys: string[]) { + return this.addDeleteJob({ keys }); } /** @@ -62,68 +67,27 @@ export class S3DatasetSource { deleteDatasetFilesByPrefix(params: DeleteDatasetFilesByPrefixParams) { const { datasetId } = DeleteDatasetFilesByPrefixParamsSchema.parse(params); const prefix = [S3Sources.dataset, datasetId].filter(Boolean).join('/'); - return this.bucket.addDeleteJob({ prefix }); - } - - // 单个键删除 - deleteDatasetFileByKey(key?: string) { - return this.bucket.addDeleteJob({ key }); - } - - // 多个键删除 - deleteDatasetFilesByKeys(keys: string[]) { - return this.bucket.addDeleteJob({ keys }); - } - - // 获取文件流 - getDatasetFileStream(key: string) { - return this.bucket.getObject(key); - } - - // 获取文件状态 - getDatasetFileStat(key: string) { - try { - return this.bucket.statObject(key); - } catch (error) { - if (error instanceof S3Error && error.message === 'Not Found') { - return null; - } - return Promise.reject(error); - } - } - - // 获取文件元数据 - async getFileMetadata(key: string) { - const stat = await this.getDatasetFileStat(key); - if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' }; - - const contentLength = stat.size; - const filename: string = decodeURIComponent(stat.metaData['origin-filename']); - const extension = parseFileExtensionFromUrl(filename); - const contentType: string = stat.metaData['content-type']; - return { - filename, - extension, - contentType, - contentLength - }; + return this.addDeleteJob({ prefix }); } async getDatasetBase64Image(key: string): Promise { const [stream, metadata] = await Promise.all([ - this.getDatasetFileStream(key), + this.getFileStream(key), this.getFileMetadata(key) ]); - const buffer = await this.bucket.fileStreamToBuffer(stream); + const buffer = await this.fileStreamToBuffer(stream); const base64 = buffer.toString('base64'); - return `data:${metadata.contentType || 'image/jpeg'};base64,${base64}`; + return `data:${metadata?.contentType || 'image/jpeg'};base64,${base64}`; } async getDatasetFileRawText(params: GetDatasetFileContentParams) { const { fileId, teamId, tmbId, customPdfParse, getFormatText, usageId } = GetDatasetFileContentParamsSchema.parse(params); - const rawTextBuffer = await this.getRawTextBuffer({ customPdfParse, sourceId: fileId }); + const rawTextBuffer = await this.rawTextSource.getRawTextBuffer({ + customPdfParse, + sourceId: fileId + }); if (rawTextBuffer) { return { rawText: rawTextBuffer.text, @@ -133,14 +97,14 @@ export class S3DatasetSource { const [metadata, stream] = await Promise.all([ this.getFileMetadata(fileId), - this.getDatasetFileStream(fileId) + this.getFileStream(fileId) ]); - const extension = metadata.extension; - const filename: string = decodeURIComponent(metadata.filename); + const extension = metadata?.extension || ''; + const filename: string = decodeURIComponent(metadata?.filename || ''); const start = Date.now(); - const buffer = await this.bucket.fileStreamToBuffer(stream); + const buffer = await this.fileStreamToBuffer(stream); addLog.debug('get dataset file buffer', { time: Date.now() - start }); const encoding = detectFileEncoding(buffer); @@ -159,7 +123,7 @@ export class S3DatasetSource { } }); - this.addRawTextBuffer({ + this.rawTextSource.addRawTextBuffer({ sourceId: fileId, sourceName: filename, text: rawText, @@ -195,11 +159,11 @@ export class S3DatasetSource { await MongoS3TTL.create({ minioKey: key, - bucketName: this.bucket.name, + bucketName: this.bucketName, expiredTime: addHours(new Date(), 3) }); - await this.bucket.putObject(key, stream, size, { + await this.putObject(key, stream, size, { 'content-type': Mimes[path.extname(truncatedFilename) as keyof typeof Mimes], 'upload-time': new Date().toISOString(), 'origin-filename': encodeURIComponent(truncatedFilename) @@ -207,51 +171,6 @@ export class S3DatasetSource { return key; } - - async addRawTextBuffer(params: AddRawTextBufferParams) { - const { sourceId, sourceName, text, customPdfParse } = - AddRawTextBufferParamsSchema.parse(params); - - // 因为 Key 唯一对应一个 Object 所以不需要根据文件内容计算 Hash 直接用 Key 计算 Hash 就行了 - const hash = createHash('md5').update(sourceId).digest('hex'); - const key = getFileS3Key.rawText({ hash, customPdfParse }); - - await MongoS3TTL.create({ - minioKey: key, - bucketName: this.bucket.name, - expiredTime: addMinutes(new Date(), 20) - }); - - const buffer = Buffer.from(text); - await this.bucket.putObject(key, buffer, buffer.length, { - 'content-type': 'text/plain', - 'origin-filename': encodeURIComponent(sourceName), - 'upload-time': new Date().toISOString() - }); - - return key; - } - - async getRawTextBuffer(params: GetRawTextBufferParams) { - const { customPdfParse, sourceId } = params; - - const hash = createHash('md5').update(sourceId).digest('hex'); - const key = getFileS3Key.rawText({ hash, customPdfParse }); - - if (!(await this.bucket.isObjectExists(key))) return null; - - const [stream, metadata] = await Promise.all([ - this.bucket.getObject(key), - this.getFileMetadata(key) - ]); - - const buffer = await this.bucket.fileStreamToBuffer(stream); - - return { - text: buffer.toString('utf-8'), - filename: metadata.filename - }; - } } export function getS3DatasetSource() { diff --git a/packages/service/common/s3/sources/dataset/type.ts b/packages/service/common/s3/sources/dataset/type.ts index 79350d896..996218a30 100644 --- a/packages/service/common/s3/sources/dataset/type.ts +++ b/packages/service/common/s3/sources/dataset/type.ts @@ -59,12 +59,3 @@ export const UploadParamsSchema = z.union([ }) ]); export type UploadParams = z.input; - -export const AddRawTextBufferParamsSchema = z.object({ - customPdfParse: z.boolean().optional(), - sourceId: z.string().nonempty(), - sourceName: z.string().nonempty(), - text: z.string() -}); -export type AddRawTextBufferParams = z.input; -export type GetRawTextBufferParams = Pick; diff --git a/packages/service/common/s3/sources/rawText/index.ts b/packages/service/common/s3/sources/rawText/index.ts new file mode 100644 index 000000000..5f09e0675 --- /dev/null +++ b/packages/service/common/s3/sources/rawText/index.ts @@ -0,0 +1,79 @@ +import { S3PrivateBucket } from '../../buckets/private'; +import { + type AddRawTextBufferParams, + AddRawTextBufferParamsSchema, + type GetRawTextBufferParams +} from './type'; +import { MongoS3TTL } from '../../schema'; +import { addMinutes } from 'date-fns'; +import { getFileS3Key } from '../../utils'; +import { createHash } from 'node:crypto'; + +export class S3RawTextSource extends S3PrivateBucket { + constructor() { + super(); + } + + // 获取文件元数据 + async getFilename(key: string) { + const stat = await this.statObject(key); + if (!stat) return ''; + + const filename: string = decodeURIComponent(stat.metaData['origin-filename']); + return filename; + } + + async addRawTextBuffer(params: AddRawTextBufferParams) { + const { sourceId, sourceName, text, customPdfParse } = + AddRawTextBufferParamsSchema.parse(params); + + // 因为 Key 唯一对应一个 Object 所以不需要根据文件内容计算 Hash 直接用 Key 计算 Hash 就行了 + const hash = createHash('md5').update(sourceId).digest('hex'); + const key = getFileS3Key.rawText({ hash, customPdfParse }); + const buffer = Buffer.from(text); + + await MongoS3TTL.create({ + minioKey: key, + bucketName: this.bucketName, + expiredTime: addMinutes(new Date(), 20) + }); + + await this.putObject(key, buffer, buffer.length, { + 'content-type': 'text/plain', + 'origin-filename': encodeURIComponent(sourceName), + 'upload-time': new Date().toISOString() + }); + + return key; + } + + async getRawTextBuffer(params: GetRawTextBufferParams) { + const { customPdfParse, sourceId } = params; + + const hash = createHash('md5').update(sourceId).digest('hex'); + const key = getFileS3Key.rawText({ hash, customPdfParse }); + + if (!(await this.isObjectExists(key))) return null; + + const [stream, filename] = await Promise.all([this.getFileStream(key), this.getFilename(key)]); + + const buffer = await this.fileStreamToBuffer(stream); + + return { + text: buffer.toString('utf-8'), + filename + }; + } +} + +export function getS3RawTextSource() { + if (global.rawTextBucket) { + return global.rawTextBucket; + } + global.rawTextBucket = new S3RawTextSource(); + return global.rawTextBucket; +} + +declare global { + var rawTextBucket: S3RawTextSource; +} diff --git a/packages/service/common/s3/sources/rawText/type.ts b/packages/service/common/s3/sources/rawText/type.ts new file mode 100644 index 000000000..4979abcc0 --- /dev/null +++ b/packages/service/common/s3/sources/rawText/type.ts @@ -0,0 +1,10 @@ +import z from 'zod'; + +export const AddRawTextBufferParamsSchema = z.object({ + customPdfParse: z.boolean().optional(), + sourceId: z.string().nonempty(), + sourceName: z.string().nonempty(), + text: z.string() +}); +export type AddRawTextBufferParams = z.input; +export type GetRawTextBufferParams = Pick; diff --git a/packages/service/core/dataset/apiDataset/custom/api.ts b/packages/service/core/dataset/apiDataset/custom/api.ts index a36534fa5..91f7eb87d 100644 --- a/packages/service/core/dataset/apiDataset/custom/api.ts +++ b/packages/service/core/dataset/apiDataset/custom/api.ts @@ -9,7 +9,7 @@ import { addLog } from '../../../../common/system/log'; import { readFileRawTextByUrl } from '../../read'; import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type'; import { type RequireOnlyOne } from '@fastgpt/global/common/type/utils'; -import { getS3DatasetSource } from '../../../../common/s3/sources/dataset'; +import { getS3RawTextSource } from '../../../../common/s3/sources/rawText'; type ResponseDataType = { success: boolean; @@ -154,7 +154,7 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer } } if (previewUrl) { // Get from buffer - const rawTextBuffer = await getS3DatasetSource().getRawTextBuffer({ + const rawTextBuffer = await getS3RawTextSource().getRawTextBuffer({ sourceId: previewUrl, customPdfParse }); @@ -175,7 +175,7 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer } getFormatText: true }); - getS3DatasetSource().addRawTextBuffer({ + getS3RawTextSource().addRawTextBuffer({ sourceId: previewUrl, sourceName: title || '', text: rawText, diff --git a/packages/service/core/workflow/dispatch/tools/readFiles.ts b/packages/service/core/workflow/dispatch/tools/readFiles.ts index 38b9df006..de88d81fc 100644 --- a/packages/service/core/workflow/dispatch/tools/readFiles.ts +++ b/packages/service/core/workflow/dispatch/tools/readFiles.ts @@ -20,7 +20,7 @@ import { S3ChatSource } from '../../../../common/s3/sources/chat'; import path from 'node:path'; import { S3Buckets } from '../../../../common/s3/constants'; import { S3Sources } from '../../../../common/s3/type'; -import { getS3DatasetSource } from '../../../../common/s3/sources/dataset'; +import { getS3RawTextSource } from '../../../../common/s3/sources/rawText'; type Props = ModuleDispatchProps<{ [NodeInputKeyEnum.fileUrlList]: string[]; @@ -175,17 +175,17 @@ export const getFileContentFromLinks = async ({ parseUrlList .map(async (url) => { // Get from buffer - const rawTextBuffer = await getS3DatasetSource().getRawTextBuffer({ + const rawTextBuffer = await getS3RawTextSource().getRawTextBuffer({ sourceId: url, customPdfParse }); - // if (rawTextBuffer) { - // return formatResponseObject({ - // filename: rawTextBuffer.filename || url, - // url, - // content: rawTextBuffer.text - // }); - // } + if (rawTextBuffer) { + return formatResponseObject({ + filename: rawTextBuffer.filename || url, + url, + content: rawTextBuffer.text + }); + } try { if (isInternalAddress(url)) { @@ -285,7 +285,7 @@ export const getFileContentFromLinks = async ({ const replacedText = replaceS3KeyToPreviewUrl(rawText, addDays(new Date(), 90)); // Add to buffer - getS3DatasetSource().addRawTextBuffer({ + getS3RawTextSource().addRawTextBuffer({ sourceId: url, sourceName: filename, text: replacedText, diff --git a/packages/service/support/permission/auth/file.ts b/packages/service/support/permission/auth/file.ts index ac883a6fb..b41fd24c5 100644 --- a/packages/service/support/permission/auth/file.ts +++ b/packages/service/support/permission/auth/file.ts @@ -20,8 +20,8 @@ export const authCollectionFile = async ({ const authRes = await parseHeaderCert(props); if (isS3ObjectKey(fileId, 'dataset')) { - const stat = await getS3DatasetSource().getDatasetFileStat(fileId); - if (!stat) return Promise.reject(CommonErrEnum.fileNotFound); + const exists = await getS3DatasetSource().isObjectExists(fileId); + if (!exists) return Promise.reject(CommonErrEnum.fileNotFound); } else { return Promise.reject('Invalid dataset file key'); } diff --git a/projects/app/src/pages/api/common/file/read/[filename].ts b/projects/app/src/pages/api/common/file/read/[filename].ts index 13748b6e8..8b26df479 100644 --- a/projects/app/src/pages/api/common/file/read/[filename].ts +++ b/projects/app/src/pages/api/common/file/read/[filename].ts @@ -31,7 +31,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse< const [file, fileStream] = await Promise.all([ getS3DatasetSource().getFileMetadata(fileId), - getS3DatasetSource().getDatasetFileStream(fileId) + getS3DatasetSource().getFileStream(fileId) ]); if (!file) { diff --git a/projects/app/src/pages/api/core/dataset/data/v2/list.ts b/projects/app/src/pages/api/core/dataset/data/v2/list.ts index 46f62d0b3..5486a70a4 100644 --- a/projects/app/src/pages/api/core/dataset/data/v2/list.ts +++ b/projects/app/src/pages/api/core/dataset/data/v2/list.ts @@ -82,7 +82,10 @@ async function handler( const s3ImageIds = imageIds.filter((id) => isS3ObjectKey(id, 'dataset')); for (const id of s3ImageIds) { - imageSizeMap.set(id, (await getS3DatasetSource().getFileMetadata(id)).contentLength); + const metadata = await getS3DatasetSource().getFileMetadata(id); + if (metadata) { + imageSizeMap.set(id, metadata.contentLength); + } } } diff --git a/projects/app/src/pages/api/system/file/[jwt].ts b/projects/app/src/pages/api/system/file/[jwt].ts index 4086a6740..35740129a 100644 --- a/projects/app/src/pages/api/system/file/[jwt].ts +++ b/projects/app/src/pages/api/system/file/[jwt].ts @@ -20,21 +20,23 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) (() => { if (isS3ObjectKey(objectKey, 'dataset')) { return [ - s3DatasetSource.getDatasetFileStream(objectKey), + s3DatasetSource.getFileStream(objectKey), s3DatasetSource.getFileMetadata(objectKey) ]; } else { return [ - s3ChatSource.getChatFileStream(objectKey), + s3ChatSource.getFileStream(objectKey), s3ChatSource.getFileMetadata(objectKey) ]; } })() ); - res.setHeader('Content-Type', metadata.contentType); + if (metadata) { + res.setHeader('Content-Type', metadata.contentType); + res.setHeader('Content-Length', metadata.contentLength); + } res.setHeader('Cache-Control', 'public, max-age=31536000'); - res.setHeader('Content-Length', metadata.contentLength); stream.pipe(res); diff --git a/test/mocks/common/s3.ts b/test/mocks/common/s3.ts index 385a97a5f..7fc2a7723 100644 --- a/test/mocks/common/s3.ts +++ b/test/mocks/common/s3.ts @@ -8,7 +8,7 @@ const createMockS3Bucket = () => ({ exist: vi.fn().mockResolvedValue(true), delete: vi.fn().mockResolvedValue(undefined), putObject: vi.fn().mockResolvedValue(undefined), - getObject: vi.fn().mockResolvedValue(null), + getFileStream: vi.fn().mockResolvedValue(null), statObject: vi.fn().mockResolvedValue({ size: 0, etag: 'mock-etag' }), move: vi.fn().mockResolvedValue(undefined), copy: vi.fn().mockResolvedValue(undefined), @@ -39,7 +39,7 @@ const createMockMinioClient = vi.hoisted(() => { copyObject: vi.fn().mockResolvedValue(undefined), removeObject: vi.fn().mockResolvedValue(undefined), putObject: vi.fn().mockResolvedValue({ etag: 'mock-etag' }), - getObject: vi.fn().mockResolvedValue(null), + getFileStream: vi.fn().mockResolvedValue(null), statObject: vi.fn().mockResolvedValue({ size: 0, etag: 'mock-etag' }), presignedGetObject: vi.fn().mockResolvedValue('http://localhost:9000/mock-bucket/mock-object'), presignedPostPolicy: vi.fn().mockResolvedValue({ @@ -81,7 +81,7 @@ const createMockBucketClass = (defaultName: string) => { } async delete() {} async putObject() {} - async getObject() { + async getFileStream() { return null; } async statObject() {