mirror of
https://github.com/labring/FastGPT.git
synced 2025-12-25 20:02:47 +00:00
Some checks failed
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions
Document deploy / sync-images (push) Has been cancelled
Document deploy / generate-timestamp (push) Has been cancelled
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Has been cancelled
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Has been cancelled
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Has been cancelled
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Has been cancelled
* perf: faq
* index
* delete dataset
* delete dataset
* perf: delete dataset
* init
* fix: faq
* refresh
* empty tip
* perf: delete type
* fix: some bugs (#6071)
* fix: publish channel doc link
* fix: checkbox disable hover style
* fix: huggingface.svg missing; update doc
* chore: update doc
* fix: typo
* fix: export log dateend;feat: file selector render (#6072)
* fix: export log dateend
* feat: file selector render
* perf: s3 controller
* team qpm limit & plan tracks (#6066)
* team qpm limit & plan tracks
* api entry qpm
* perf: computed days
* Revert "api entry qpm"
This reverts commit 1210c07217.
* perf: code
* system qpm limit
* system qpm limit
---------
Co-authored-by: archer <545436317@qq.com>
* perf: track
* remove export chat test
* doc
* feat: global agent (#6057)
* feat: global agent
* fix: agent
* fix: order display
* CHORE
* feat: error page log
* fix: var update
---------
Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>
Co-authored-by: heheer <heheer@sealos.io>
Co-authored-by: Roy <whoeverimf5@gmail.com>
308 lines
9.1 KiB
TypeScript
308 lines
9.1 KiB
TypeScript
import { Client, type RemoveOptions, type CopyConditions, S3Error } from 'minio';
|
||
import {
|
||
type CreatePostPresignedUrlOptions,
|
||
type CreatePostPresignedUrlParams,
|
||
type CreatePostPresignedUrlResult,
|
||
type S3OptionsType,
|
||
type createPreviewUrlParams,
|
||
CreateGetPresignedUrlParamsSchema
|
||
} from '../type';
|
||
import { defaultS3Options, getSystemMaxFileSize, Mimes } from '../constants';
|
||
import path from 'node:path';
|
||
import { MongoS3TTL } from '../schema';
|
||
import { addHours, addMinutes } from 'date-fns';
|
||
import { addLog } from '../../system/log';
|
||
import { addS3DelJob } from '../mq';
|
||
import { type Readable } from 'node:stream';
|
||
import { type UploadFileByBufferParams, UploadFileByBufferSchema } from '../type';
|
||
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
|
||
|
||
export class S3BaseBucket {
|
||
private _client: Client;
|
||
private _externalClient: Client | undefined;
|
||
|
||
/**
|
||
*
|
||
* @param bucketName the bucket you want to operate
|
||
* @param options the options for the s3 client
|
||
*/
|
||
constructor(
|
||
public readonly bucketName: string,
|
||
public options: Partial<S3OptionsType> = defaultS3Options
|
||
) {
|
||
options = { ...defaultS3Options, ...options };
|
||
this.options = options;
|
||
this._client = new Client(options as S3OptionsType);
|
||
|
||
if (this.options.externalBaseURL) {
|
||
const externalBaseURL = new URL(this.options.externalBaseURL);
|
||
const endpoint = externalBaseURL.hostname;
|
||
const useSSL = externalBaseURL.protocol === 'https:';
|
||
|
||
const externalPort = externalBaseURL.port
|
||
? parseInt(externalBaseURL.port)
|
||
: useSSL
|
||
? 443
|
||
: undefined; // https 默认 443,其他情况让 MinIO 客户端使用默认端口
|
||
|
||
this._externalClient = new Client({
|
||
useSSL: useSSL,
|
||
endPoint: endpoint,
|
||
port: externalPort,
|
||
accessKey: options.accessKey,
|
||
secretKey: options.secretKey,
|
||
pathStyle: options.pathStyle,
|
||
region: options.region
|
||
});
|
||
}
|
||
|
||
const init = async () => {
|
||
// Not exists bucket, create it
|
||
if (!(await this.client.bucketExists(this.bucketName))) {
|
||
await this.client.makeBucket(this.bucketName);
|
||
}
|
||
await this.options.afterInit?.();
|
||
console.log(`S3 init success: ${this.bucketName}`);
|
||
};
|
||
if (this.options.init) {
|
||
init();
|
||
}
|
||
}
|
||
|
||
get client(): Client {
|
||
return this._client;
|
||
}
|
||
get externalClient(): Client {
|
||
return this._externalClient ?? this._client;
|
||
}
|
||
|
||
// TODO: 加到 MQ 里保障幂等
|
||
async move({
|
||
from,
|
||
to,
|
||
options
|
||
}: {
|
||
from: string;
|
||
to: string;
|
||
options?: CopyConditions;
|
||
}): Promise<void> {
|
||
await this.copy({
|
||
from,
|
||
to,
|
||
options: {
|
||
copyConditions: options,
|
||
temporary: false
|
||
}
|
||
});
|
||
await this.delete(from);
|
||
}
|
||
|
||
async copy({
|
||
from,
|
||
to,
|
||
options
|
||
}: {
|
||
from: string;
|
||
to: string;
|
||
options?: {
|
||
temporary?: boolean;
|
||
copyConditions?: CopyConditions;
|
||
};
|
||
}): ReturnType<Client['copyObject']> {
|
||
const bucket = this.bucketName;
|
||
if (options?.temporary) {
|
||
await MongoS3TTL.create({
|
||
minioKey: to,
|
||
bucketName: this.bucketName,
|
||
expiredTime: addHours(new Date(), 24)
|
||
});
|
||
}
|
||
return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions);
|
||
}
|
||
|
||
async delete(objectKey: string, options?: RemoveOptions): Promise<void> {
|
||
try {
|
||
if (!objectKey) return Promise.resolve();
|
||
|
||
// 把连带的 parsed 数据一起删除
|
||
const fileParsedPrefix = `${path.dirname(objectKey)}/${path.basename(objectKey, path.extname(objectKey))}-parsed`;
|
||
await this.addDeleteJob({ prefix: fileParsedPrefix });
|
||
|
||
return await this.client.removeObject(this.bucketName, objectKey, options);
|
||
} catch (error) {
|
||
if (error instanceof S3Error) {
|
||
if (error.code === 'InvalidObjectName') {
|
||
addLog.warn(`${this.bucketName} delete object not found: ${objectKey}`, error);
|
||
return Promise.resolve();
|
||
}
|
||
}
|
||
return Promise.reject(error);
|
||
}
|
||
}
|
||
|
||
// 列出文件
|
||
listObjectsV2(
|
||
...params: Parameters<Client['listObjectsV2']> extends [string, ...infer R] ? R : never
|
||
) {
|
||
return this.client.listObjectsV2(this.bucketName, ...params);
|
||
}
|
||
|
||
// 上传文件
|
||
putObject(...params: Parameters<Client['putObject']> extends [string, ...infer R] ? R : never) {
|
||
return this.client.putObject(this.bucketName, ...params);
|
||
}
|
||
|
||
// 获取文件流
|
||
getFileStream(
|
||
...params: Parameters<Client['getObject']> extends [string, ...infer R] ? R : never
|
||
) {
|
||
return this.client.getObject(this.bucketName, ...params);
|
||
}
|
||
|
||
// 获取文件状态
|
||
async statObject(
|
||
...params: Parameters<Client['statObject']> extends [string, ...infer R] ? R : never
|
||
) {
|
||
try {
|
||
return await this.client.statObject(this.bucketName, ...params);
|
||
} catch (error) {
|
||
if (error instanceof S3Error && error.message === 'Not Found') {
|
||
return null;
|
||
}
|
||
return Promise.reject(error);
|
||
}
|
||
}
|
||
|
||
// 判断文件是否存在
|
||
async isObjectExists(key: string): Promise<boolean> {
|
||
try {
|
||
await this.client.statObject(this.bucketName, key);
|
||
return true;
|
||
} catch (err) {
|
||
if (err instanceof S3Error && err.message === 'Not Found') {
|
||
return false;
|
||
}
|
||
return Promise.reject(err);
|
||
}
|
||
}
|
||
|
||
// 将文件流转换为Buffer
|
||
async fileStreamToBuffer(stream: Readable): Promise<Buffer> {
|
||
const chunks: Buffer[] = [];
|
||
for await (const chunk of stream) {
|
||
chunks.push(chunk);
|
||
}
|
||
return Buffer.concat(chunks);
|
||
}
|
||
|
||
addDeleteJob(params: Omit<Parameters<typeof addS3DelJob>[0], 'bucketName'>) {
|
||
return addS3DelJob({ ...params, bucketName: this.bucketName });
|
||
}
|
||
|
||
async createPostPresignedUrl(
|
||
params: CreatePostPresignedUrlParams,
|
||
options: CreatePostPresignedUrlOptions = {}
|
||
): Promise<CreatePostPresignedUrlResult> {
|
||
try {
|
||
const { expiredHours, maxFileSize = getSystemMaxFileSize() } = options;
|
||
const formatMaxFileSize = maxFileSize * 1024 * 1024;
|
||
const filename = params.filename;
|
||
const ext = path.extname(filename).toLowerCase();
|
||
const contentType = Mimes[ext as keyof typeof Mimes] ?? 'application/octet-stream';
|
||
|
||
const key = params.rawKey;
|
||
|
||
const policy = this.externalClient.newPostPolicy();
|
||
policy.setKey(key);
|
||
policy.setBucket(this.bucketName);
|
||
policy.setContentType(contentType);
|
||
if (formatMaxFileSize) {
|
||
policy.setContentLengthRange(1, formatMaxFileSize);
|
||
}
|
||
policy.setExpires(addMinutes(new Date(), 10));
|
||
policy.setUserMetaData({
|
||
'content-disposition': `attachment; filename="${encodeURIComponent(filename)}"`,
|
||
'origin-filename': encodeURIComponent(filename),
|
||
'upload-time': new Date().toISOString(),
|
||
...params.metadata
|
||
});
|
||
|
||
const { formData, postURL } = await this.externalClient.presignedPostPolicy(policy);
|
||
|
||
if (expiredHours) {
|
||
await MongoS3TTL.create({
|
||
minioKey: key,
|
||
bucketName: this.bucketName,
|
||
expiredTime: addHours(new Date(), expiredHours)
|
||
});
|
||
}
|
||
|
||
return {
|
||
url: postURL,
|
||
fields: formData,
|
||
maxSize: formatMaxFileSize
|
||
};
|
||
} catch (error) {
|
||
addLog.error('Failed to create post presigned url', error);
|
||
return Promise.reject('Failed to create post presigned url');
|
||
}
|
||
}
|
||
|
||
async createExternalUrl(params: createPreviewUrlParams) {
|
||
const parsed = CreateGetPresignedUrlParamsSchema.parse(params);
|
||
|
||
const { key, expiredHours } = parsed;
|
||
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
|
||
|
||
return await this.externalClient.presignedGetObject(this.bucketName, key, expires);
|
||
}
|
||
|
||
async createPreviewUrl(params: createPreviewUrlParams) {
|
||
const parsed = CreateGetPresignedUrlParamsSchema.parse(params);
|
||
|
||
const { key, expiredHours } = parsed;
|
||
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
|
||
|
||
return await this.client.presignedGetObject(this.bucketName, key, expires);
|
||
}
|
||
|
||
async uploadFileByBuffer(params: UploadFileByBufferParams) {
|
||
const { key, buffer, contentType } = UploadFileByBufferSchema.parse(params);
|
||
|
||
await MongoS3TTL.create({
|
||
minioKey: key,
|
||
bucketName: this.bucketName,
|
||
expiredTime: addHours(new Date(), 1)
|
||
});
|
||
await this.putObject(key, buffer, undefined, {
|
||
'Content-Type': contentType || 'application/octet-stream'
|
||
});
|
||
|
||
return {
|
||
key,
|
||
accessUrl: await this.createExternalUrl({
|
||
key,
|
||
expiredHours: 2
|
||
})
|
||
};
|
||
}
|
||
|
||
// 对外包装的方法
|
||
// 获取文件元数据
|
||
async getFileMetadata(key: string) {
|
||
const stat = await this.statObject(key);
|
||
if (!stat) return;
|
||
|
||
const contentLength = stat.size;
|
||
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
|
||
const extension = parseFileExtensionFromUrl(filename);
|
||
const contentType: string = stat.metaData['content-type'];
|
||
return {
|
||
filename,
|
||
extension,
|
||
contentType,
|
||
contentLength
|
||
};
|
||
}
|
||
}
|