feat: migrate to fastgpt storage sdk

This commit is contained in:
xqvvu 2025-12-17 10:16:09 +08:00
parent ab743b9358
commit b2e3e93fb3
No known key found for this signature in database
GPG Key ID: 8CDDE20C9D911EE4
53 changed files with 5686 additions and 560 deletions

View File

@ -2,7 +2,7 @@
"name": "@fastgpt/global",
"version": "1.0.0",
"dependencies": {
"@fastgpt-sdk/plugin": "0.2.16",
"@fastgpt-sdk/plugin": "0.2.17",
"@apidevtools/swagger-parser": "^10.1.0",
"@bany/curl-to-json": "^1.2.8",
"axios": "^1.12.1",

View File

@ -10,18 +10,17 @@ import {
type CreatePostPresignedUrlOptions,
type CreatePostPresignedUrlParams,
type CreatePostPresignedUrlResult,
type S3OptionsType,
type createPreviewUrlParams,
CreateGetPresignedUrlParamsSchema
} from '../type';
import { defaultS3Options, getSystemMaxFileSize, Mimes } from '../constants';
import { getSystemMaxFileSize, Mimes } from '../constants';
import path from 'node:path';
import { MongoS3TTL } from '../schema';
import { addHours, addMinutes } from 'date-fns';
import { addHours, addMinutes, differenceInSeconds } from 'date-fns';
import { addLog } from '../../system/log';
import { addS3DelJob } from '../mq';
import { type Readable } from 'node:stream';
import { type UploadFileByBufferParams, UploadFileByBufferSchema } from '../type';
import { type IStorage } from '@fastgpt-sdk/storage';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
// Check if the error is a "file not found" type error, which should be treated as success
@ -45,82 +44,26 @@ export const isFileNotFoundError = (error: any): boolean => {
};
export class S3BaseBucket {
private _client: Client;
private _externalClient: Client | undefined;
/**
*
* @param bucketName the bucket you want to operate
* @param options the options for the s3 client
*/
constructor(
public readonly bucketName: string,
public options: Partial<S3OptionsType> = defaultS3Options
) {
options = { ...defaultS3Options, ...options };
this.options = options;
this._client = new Client(options as S3OptionsType);
private readonly _client: IStorage,
private readonly _externalClient: IStorage | undefined
) {}
if (this.options.externalBaseURL) {
const externalBaseURL = new URL(this.options.externalBaseURL);
const endpoint = externalBaseURL.hostname;
const useSSL = externalBaseURL.protocol === 'https:';
const externalPort = externalBaseURL.port
? parseInt(externalBaseURL.port)
: useSSL
? 443
: undefined; // https 默认 443其他情况让 MinIO 客户端使用默认端口
this._externalClient = new Client({
useSSL: useSSL,
endPoint: endpoint,
port: externalPort,
accessKey: options.accessKey,
secretKey: options.secretKey,
pathStyle: options.pathStyle,
region: options.region
});
}
const init = async () => {
// Not exists bucket, create it
if (!(await this.client.bucketExists(this.bucketName))) {
await this.client.makeBucket(this.bucketName);
}
await this.options.afterInit?.();
console.log(`S3 init success: ${this.bucketName}`);
};
if (this.options.init) {
init();
}
}
get client(): Client {
get client(): IStorage {
return this._client;
}
get externalClient(): Client {
get externalClient(): IStorage {
return this._externalClient ?? this._client;
}
get bucketName(): string {
return this.client.bucketName;
}
// TODO: 加到 MQ 里保障幂等
async move({
from,
to,
options
}: {
from: string;
to: string;
options?: CopyConditions;
}): Promise<void> {
await this.copy({
from,
to,
options: {
copyConditions: options,
temporary: false
}
});
async move({ from, to }: { from: string; to: string }): Promise<void> {
await this.copy({ from, to, options: { temporary: false } });
await this.removeObject(from);
}
@ -133,10 +76,8 @@ export class S3BaseBucket {
to: string;
options?: {
temporary?: boolean;
copyConditions?: CopyConditions;
};
}): ReturnType<Client['copyObject']> {
const bucket = this.bucketName;
}) {
if (options?.temporary) {
await MongoS3TTL.create({
minioKey: to,
@ -144,11 +85,11 @@ export class S3BaseBucket {
expiredTime: addHours(new Date(), 24)
});
}
return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions);
return this.client.copyObjectInSelfBucket({ sourceKey: from, targetKey: to });
}
async removeObject(objectKey: string, options?: RemoveOptions): Promise<void> {
return this.client.removeObject(this.bucketName, objectKey, options).catch((err) => {
async removeObject(objectKey: string): Promise<void> {
this.client.deleteObject({ key: objectKey }).catch((err) => {
if (isFileNotFoundError(err)) {
return Promise.resolve();
}
@ -160,66 +101,17 @@ export class S3BaseBucket {
});
}
// 列出文件
listObjectsV2(
...params: Parameters<Client['listObjectsV2']> extends [string, ...infer R] ? R : never
) {
return this.client.listObjectsV2(this.bucketName, ...params);
}
// 上传文件
putObject(...params: Parameters<Client['putObject']> extends [string, ...infer R] ? R : never) {
return this.client.putObject(this.bucketName, ...params);
}
// 获取文件流
getFileStream(
...params: Parameters<Client['getObject']> extends [string, ...infer R] ? R : never
) {
return this.client.getObject(this.bucketName, ...params);
}
// 获取文件状态
async statObject(
...params: Parameters<Client['statObject']> extends [string, ...infer R] ? R : never
) {
try {
return await this.client.statObject(this.bucketName, ...params);
} catch (error) {
if (error instanceof S3Error && error.message === 'Not Found') {
return null;
}
return Promise.reject(error);
}
}
// 判断文件是否存在
async isObjectExists(key: string): Promise<boolean> {
try {
await this.client.statObject(this.bucketName, key);
return true;
} catch (err) {
if (err instanceof S3Error && err.message === 'Not Found') {
return false;
}
return Promise.reject(err);
}
}
// 将文件流转换为Buffer
async fileStreamToBuffer(stream: Readable): Promise<Buffer> {
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
addDeleteJob(params: Omit<Parameters<typeof addS3DelJob>[0], 'bucketName'>) {
return addS3DelJob({ ...params, bucketName: this.bucketName });
}
async createPostPresignedUrl(
async isObjectExists(key: string) {
const { exists } = await this.client.checkObjectExists({ key });
return exists ?? false;
}
async createPresignedPutUrl(
params: CreatePostPresignedUrlParams,
options: CreatePostPresignedUrlOptions = {}
): Promise<CreatePostPresignedUrlResult> {
@ -229,42 +121,39 @@ export class S3BaseBucket {
const filename = params.filename;
const ext = path.extname(filename).toLowerCase();
const contentType = Mimes[ext as keyof typeof Mimes] ?? 'application/octet-stream';
const expiredSeconds = differenceInSeconds(addMinutes(new Date(), 10), new Date());
const key = params.rawKey;
const policy = this.externalClient.newPostPolicy();
policy.setKey(key);
policy.setBucket(this.bucketName);
policy.setContentType(contentType);
if (formatMaxFileSize) {
policy.setContentLengthRange(1, formatMaxFileSize);
}
policy.setExpires(addMinutes(new Date(), 10));
policy.setUserMetaData({
'content-disposition': `attachment; filename="${encodeURIComponent(filename)}"`,
'origin-filename': encodeURIComponent(filename),
'upload-time': new Date().toISOString(),
...params.metadata
const { metadata, putUrl } = await this.externalClient.generatePresignedPutUrl({
key: params.rawKey,
expiredSeconds,
contentType,
metadata: {
contentDisposition: `attachment; filename="${encodeURIComponent(filename)}"`,
originFilename: encodeURIComponent(filename),
uploadTime: new Date().toISOString(),
...params.metadata
}
});
const { formData, postURL } = await this.externalClient.presignedPostPolicy(policy);
if (expiredHours) {
await MongoS3TTL.create({
minioKey: key,
minioKey: params.rawKey,
bucketName: this.bucketName,
expiredTime: addHours(new Date(), expiredHours)
});
}
return {
url: postURL,
fields: formData,
url: putUrl,
fields: {
...metadata,
key: params.rawKey
},
maxSize: formatMaxFileSize
};
} catch (error) {
addLog.error('Failed to create post presigned url', error);
return Promise.reject('Failed to create post presigned url');
addLog.error('Failed to create presigned put url', error);
return Promise.reject('Failed to create presigned put url');
}
}
@ -274,7 +163,7 @@ export class S3BaseBucket {
const { key, expiredHours } = parsed;
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
return await this.externalClient.presignedGetObject(this.bucketName, key, expires);
return await this.externalClient.generatePresignedGetUrl({ key, expiredSeconds: expires });
}
async createPreviewUrl(params: createPreviewUrlParams) {
@ -283,7 +172,7 @@ export class S3BaseBucket {
const { key, expiredHours } = parsed;
const expires = expiredHours ? expiredHours * 60 * 60 : 30 * 60; // expires 的单位是秒 默认 30 分钟
return await this.client.presignedGetObject(this.bucketName, key, expires);
return await this.client.generatePresignedGetUrl({ key, expiredSeconds: expires });
}
async uploadFileByBuffer(params: UploadFileByBufferParams) {
@ -294,8 +183,11 @@ export class S3BaseBucket {
bucketName: this.bucketName,
expiredTime: addHours(new Date(), 1)
});
await this.putObject(key, buffer, undefined, {
'Content-Type': contentType || 'application/octet-stream'
await this.client.uploadObject({
key,
body: buffer,
contentType: contentType || 'application/octet-stream'
});
return {
@ -307,16 +199,15 @@ export class S3BaseBucket {
};
}
// 对外包装的方法
// 获取文件元数据
async getFileMetadata(key: string) {
const stat = await this.statObject(key);
if (!stat) return;
const metadataResponse = await this.client.getObjectMetadata({ key });
if (!metadataResponse) return;
const contentLength = stat.size;
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
const contentLength = metadataResponse.contentLength;
const filename: string = decodeURIComponent(metadataResponse.metadata.originFilename || '');
const extension = parseFileExtensionFromUrl(filename);
const contentType: string = stat.metaData['content-type'];
const contentType: string = metadataResponse.contentType || 'application/octet-stream';
return {
filename,
extension,
@ -324,4 +215,11 @@ export class S3BaseBucket {
contentLength
};
}
async getFileStream(key: string) {
const downloadResponse = await this.client.downloadObject({ key });
if (!downloadResponse) return;
return downloadResponse.body;
}
}

View File

@ -1,9 +1,80 @@
import { S3BaseBucket } from './base';
import { S3Buckets } from '../constants';
import { type S3OptionsType } from '../type';
import { createDefaultStorageOptions } from '../constants';
import {
type IAwsS3CompatibleStorageOptions,
createStorage,
type ICosStorageOptions,
type IOssStorageOptions,
type IStorage
} from '@fastgpt-sdk/storage';
export class S3PrivateBucket extends S3BaseBucket {
constructor(options?: Partial<S3OptionsType>) {
super(S3Buckets.private, options);
constructor() {
const { vendor, privateBucket, externalBaseUrl, credentials, region, ...options } =
createDefaultStorageOptions();
let config: any = {};
let externalConfig: any = {};
if (vendor === 'minio') {
config = {
region,
vendor,
credentials,
forcePathStyle: true,
endpoint: options.endpoint!,
maxRetries: options.maxRetries!
} as Omit<IAwsS3CompatibleStorageOptions, 'bucket'>;
externalConfig = {
...config,
endpoint: externalBaseUrl
};
} else if (vendor === 'aws-s3') {
config = {
region,
vendor,
credentials,
endpoint: options.endpoint!,
maxRetries: options.maxRetries!
} as Omit<IAwsS3CompatibleStorageOptions, 'bucket'>;
externalConfig = {
...config,
endpoint: externalBaseUrl
};
} else if (vendor === 'cos') {
config = {
region,
vendor,
credentials,
proxy: options.proxy,
domain: options.domain,
protocol: options.protocol,
useAccelerate: options.useAccelerate
} as Omit<ICosStorageOptions, 'bucket'>;
} else if (vendor === 'oss') {
config = {
region,
vendor,
credentials,
endpoint: options.endpoint!,
cname: options.cname,
internal: options.internal,
secure: options.secure,
enableProxy: options.enableProxy
} as Omit<IOssStorageOptions, 'bucket'>;
}
const client = createStorage({ bucket: privateBucket, ...config });
let externalClient: IStorage | undefined = undefined;
if (externalBaseUrl) {
externalClient = createStorage({ bucket: privateBucket, ...externalConfig });
}
super(client, externalClient);
client.ensureBucket();
if (externalClient) {
externalClient.ensureBucket();
}
}
}

View File

@ -1,51 +1,92 @@
import { S3BaseBucket } from './base';
import { S3Buckets } from '../constants';
import { type S3OptionsType } from '../type';
import { createDefaultStorageOptions } from '../constants';
import {
type IAwsS3CompatibleStorageOptions,
type ICosStorageOptions,
type IOssStorageOptions,
createStorage,
type IStorage,
type MinioStorageAdapter
} from '@fastgpt-sdk/storage';
export class S3PublicBucket extends S3BaseBucket {
constructor(options?: Partial<S3OptionsType>) {
super(S3Buckets.public, {
...options,
afterInit: async () => {
const bucket = this.bucketName;
const policy = JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Principal: '*',
Action: 's3:GetObject',
Resource: `arn:aws:s3:::${bucket}/*`
}
]
});
try {
await this.client.setBucketPolicy(bucket, policy);
} catch (error) {
// NOTE: maybe it was a cloud S3 that doesn't allow us to set the policy, so that cause the error,
// maybe we can ignore the error, or we have other plan to handle this.
console.error('Failed to set bucket policy:', error);
}
}
constructor() {
const { vendor, publicBucket, externalBaseUrl, credentials, region, ...options } =
createDefaultStorageOptions();
let config: any = {};
let externalConfig: any = {};
if (vendor === 'minio') {
config = {
region,
vendor,
credentials,
forcePathStyle: true,
endpoint: options.endpoint!,
maxRetries: options.maxRetries!
} as Omit<IAwsS3CompatibleStorageOptions, 'bucket'>;
externalConfig = {
...config,
endpoint: externalBaseUrl
};
} else if (vendor === 'aws-s3') {
config = {
region,
vendor,
credentials,
endpoint: options.endpoint!,
maxRetries: options.maxRetries!
} as Omit<IAwsS3CompatibleStorageOptions, 'bucket'>;
externalConfig = {
...config,
endpoint: externalBaseUrl
};
} else if (vendor === 'cos') {
config = {
region,
vendor,
credentials,
proxy: options.proxy,
domain: options.domain,
protocol: options.protocol,
useAccelerate: options.useAccelerate
} as Omit<ICosStorageOptions, 'bucket'>;
} else if (vendor === 'oss') {
config = {
region,
vendor,
credentials,
endpoint: options.endpoint!,
cname: options.cname,
internal: options.internal,
secure: options.secure,
enableProxy: options.enableProxy
} as Omit<IOssStorageOptions, 'bucket'>;
}
const client = createStorage({ bucket: publicBucket, ...config });
let externalClient: IStorage | undefined = undefined;
if (externalBaseUrl) {
externalClient = createStorage({ bucket: publicBucket, ...externalConfig });
}
super(client, externalClient);
client.ensureBucket().then(() => {
if (vendor !== 'minio') return;
(client as MinioStorageAdapter).ensurePublicBucketPolicy();
});
if (externalClient) {
externalClient.ensureBucket().then(() => {
if (vendor !== 'minio') return;
(externalClient as MinioStorageAdapter).ensurePublicBucketPolicy();
});
}
}
createPublicUrl(objectKey: string): string {
const protocol = this.options.useSSL ? 'https' : 'http';
const hostname = this.options.endPoint;
const port = this.options.port;
const bucket = this.bucketName;
const url = new URL(`${protocol}://${hostname}:${port}/${bucket}/${objectKey}`);
if (this.options.externalBaseURL) {
const externalBaseURL = new URL(this.options.externalBaseURL);
url.port = externalBaseURL.port;
url.hostname = externalBaseURL.hostname;
url.protocol = externalBaseURL.protocol;
}
return url.toString();
return this.externalClient.generatePublicGetUrl({ key: objectKey }).publicGetUrl;
}
}

View File

@ -1,6 +1,9 @@
import { HttpProxyAgent } from 'http-proxy-agent';
import { HttpsProxyAgent } from 'https-proxy-agent';
import type { ClientOptions } from 'minio';
import type {
IAwsS3CompatibleStorageOptions,
ICosStorageOptions,
IOssStorageOptions,
IStorageOptions
} from '@fastgpt-sdk/storage';
export const Mimes = {
'.gif': 'image/gif',
@ -25,24 +28,9 @@ export const Mimes = {
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation'
} as const;
export const defaultS3Options: {
externalBaseURL?: string;
afterInit?: () => Promise<void> | void;
init?: boolean;
} & ClientOptions = {
useSSL: process.env.S3_USE_SSL === 'true',
endPoint: process.env.S3_ENDPOINT || 'localhost',
externalBaseURL: process.env.S3_EXTERNAL_BASE_URL,
accessKey: process.env.S3_ACCESS_KEY || 'minioadmin',
secretKey: process.env.S3_SECRET_KEY || 'minioadmin',
port: process.env.S3_PORT ? parseInt(process.env.S3_PORT) : 9000,
pathStyle: process.env.S3_PATH_STYLE === 'false' ? false : true,
region: process.env.S3_REGION || undefined
};
export const S3Buckets = {
public: process.env.S3_PUBLIC_BUCKET || 'fastgpt-public',
private: process.env.S3_PRIVATE_BUCKET || 'fastgpt-private'
public: process.env.STORAGE_PUBLIC_BUCKET || 'fastgpt-public',
private: process.env.STORAGE_PRIVATE_BUCKET || 'fastgpt-private'
} as const;
export const getSystemMaxFileSize = () => {
@ -51,3 +39,104 @@ export const getSystemMaxFileSize = () => {
};
export const S3_KEY_PATH_INVALID_CHARS = /[|\\/]/;
export function createDefaultStorageOptions() {
const vendor = (process.env.STORAGE_VENDOR || 'minio') as IStorageOptions['vendor'];
switch (vendor) {
case 'minio': {
return {
vendor: 'minio',
forcePathStyle: true,
externalBaseUrl: process.env.STORAGE_EXTERNAL_BASE_URL || undefined,
endpoint: process.env.STORAGE_S3_ENDPOINT || 'http://localhost:9000',
region: process.env.STORAGE_REGION || 'us-east-1',
publicBucket: process.env.STORAGE_PUBLIC_BUCKET || 'fastgpt-public',
privateBucket: process.env.STORAGE_PRIVATE_BUCKET || 'fastgpt-private',
credentials: {
accessKeyId: process.env.STORAGE_ACCESS_KEY_ID || 'minioadmin',
secretAccessKey: process.env.STORAGE_SECRET_ACCESS_KEY || 'minioadmin'
},
maxRetries: process.env.STORAGE_S3_MAX_RETRIES
? parseInt(process.env.STORAGE_S3_MAX_RETRIES)
: 3
} satisfies Omit<IAwsS3CompatibleStorageOptions, 'bucket'> & {
publicBucket: string;
privateBucket: string;
externalBaseUrl?: string;
};
}
case 'aws-s3': {
return {
vendor: 'aws-s3',
forcePathStyle: process.env.STORAGE_S3_FORCE_PATH_STYLE === 'true' ? true : false,
externalBaseUrl: process.env.STORAGE_EXTERNAL_BASE_URL || undefined,
endpoint: process.env.STORAGE_S3_ENDPOINT || '',
region: process.env.STORAGE_REGION || 'us-east-1',
publicBucket: process.env.STORAGE_PUBLIC_BUCKET || 'fastgpt-public',
privateBucket: process.env.STORAGE_PRIVATE_BUCKET || 'fastgpt-private',
credentials: {
accessKeyId: process.env.STORAGE_ACCESS_KEY_ID || '',
secretAccessKey: process.env.STORAGE_SECRET_ACCESS_KEY || ''
},
maxRetries: process.env.STORAGE_S3_MAX_RETRIES
? parseInt(process.env.STORAGE_S3_MAX_RETRIES)
: 3
} satisfies Omit<IAwsS3CompatibleStorageOptions, 'bucket'> & {
publicBucket: string;
privateBucket: string;
externalBaseUrl?: string;
};
}
case 'cos': {
return {
vendor: 'cos',
externalBaseUrl: process.env.STORAGE_EXTERNAL_BASE_URL || undefined,
region: process.env.STORAGE_REGION || 'ap-shanghai',
publicBucket: process.env.STORAGE_PUBLIC_BUCKET || 'fastgpt-public',
privateBucket: process.env.STORAGE_PRIVATE_BUCKET || 'fastgpt-private',
credentials: {
accessKeyId: process.env.STORAGE_ACCESS_KEY_ID || '',
secretAccessKey: process.env.STORAGE_SECRET_ACCESS_KEY || ''
},
protocol: (process.env.STORAGE_COS_PROTOCOL as 'https:' | 'http:' | undefined) || 'https:',
useAccelerate: process.env.STORAGE_COS_USE_ACCELERATE === 'true' ? true : false,
domain: process.env.STORAGE_COS_CNAME_DOMAIN || undefined,
proxy: process.env.STORAGE_COS_PROXY || undefined
} satisfies Omit<ICosStorageOptions, 'bucket'> & {
publicBucket: string;
privateBucket: string;
externalBaseUrl?: string;
};
}
case 'oss': {
return {
vendor: 'oss',
externalBaseUrl: process.env.STORAGE_EXTERNAL_BASE_URL || undefined,
endpoint: process.env.STORAGE_OSS_ENDPOINT || '',
region: process.env.STORAGE_REGION || 'oss-cn-hangzhou',
publicBucket: process.env.STORAGE_PUBLIC_BUCKET || 'fastgpt-public',
privateBucket: process.env.STORAGE_PRIVATE_BUCKET || 'fastgpt-private',
credentials: {
accessKeyId: process.env.STORAGE_ACCESS_KEY_ID || '',
secretAccessKey: process.env.STORAGE_SECRET_ACCESS_KEY || ''
},
cname: process.env.STORAGE_OSS_CNAME === 'true' ? true : false,
internal: process.env.STORAGE_OSS_INTERNAL === 'true' ? true : false,
secure: process.env.STORAGE_OSS_SECURE === 'true' ? true : false,
enableProxy: process.env.STORAGE_OSS_ENABLE_PROXY === 'false' ? false : true
} satisfies Omit<IOssStorageOptions, 'bucket'> & {
publicBucket: string;
privateBucket: string;
externalBaseUrl?: string;
};
}
default: {
throw new Error(`Unsupported storage vendor: ${vendor}`);
}
}
}

View File

@ -4,8 +4,8 @@ import { addLog } from '../system/log';
import { startS3DelWorker } from './mq';
export function initS3Buckets() {
const publicBucket = new S3PublicBucket({ init: true });
const privateBucket = new S3PrivateBucket({ init: true });
const publicBucket = new S3PublicBucket();
const privateBucket = new S3PrivateBucket();
global.s3BucketMap = {
[publicBucket.bucketName]: publicBucket,

View File

@ -1,8 +1,8 @@
import { getQueue, getWorker, QueueNames } from '../bullmq';
import { addLog } from '../system/log';
import path from 'path';
import { batchRun } from '@fastgpt/global/common/system/utils';
import { isFileNotFoundError, type S3BaseBucket } from './buckets/base';
import { batchRun, retryFn } from '@fastgpt/global/common/system/utils';
import pLimit from 'p-limit';
export type S3MQJobData = {
key?: string;
@ -23,6 +23,7 @@ const jobOption = {
type: 'exponential'
}
};
export const addS3DelJob = async (data: S3MQJobData): Promise<void> => {
const queue = getQueue<S3MQJobData>(QueueNames.s3FileDelete);
const jobId = (() => {
@ -40,77 +41,15 @@ export const addS3DelJob = async (data: S3MQJobData): Promise<void> => {
await queue.add('delete-s3-files', data, { jobId, ...jobOption });
};
export const prefixDel = async (bucket: S3BaseBucket, prefix: string) => {
addLog.debug(`[S3 delete] delete prefix: ${prefix}`);
let tasks: Promise<any>[] = [];
return new Promise<void>((resolve, reject) => {
let timer: NodeJS.Timeout;
const stream = bucket.listObjectsV2(prefix, true);
let settled = false;
const finish = (error?: any) => {
if (settled) return;
settled = true;
if (timer) {
clearTimeout(timer);
}
stream?.removeAllListeners?.();
stream?.destroy?.();
if (error) {
addLog.error(`[S3 delete] delete prefix failed`, error);
reject(error);
} else {
resolve();
}
};
// stream 可能会中断,没有触发 end 和 error导致 promise 不返回,需要增加定时器兜底。
timer = setTimeout(() => {
addLog.error(`[S3 delete] delete prefix timeout: ${prefix}`);
finish('Timeout');
}, 60000);
stream.on('data', (file) => {
if (!file.name) return;
tasks.push(bucket.removeObject(file.name));
});
stream.on('end', async () => {
if (tasks.length === 0) {
return finish();
}
if (timer) {
clearTimeout(timer);
}
const results = await Promise.allSettled(tasks);
const failed = results.some((r) => r.status === 'rejected');
if (failed) {
return finish('Some deletes failed');
}
finish();
});
stream.on('error', (err) => {
if (isFileNotFoundError(err)) {
return finish();
}
addLog.error(`[S3 delete] delete prefix: ${prefix} error`, err);
return finish(err);
});
stream.on('pause', () => {
addLog.warn(`[S3 delete] delete prefix: ${prefix} paused`);
stream.resume();
});
});
};
export const startS3DelWorker = async () => {
const limit = pLimit(50);
return getWorker<S3MQJobData>(
QueueNames.s3FileDelete,
async (job) => {
let { prefix, bucketName, key, keys } = job.data;
const bucket = global.s3BucketMap[bucketName];
if (!bucket) {
addLog.error(`Bucket not found: ${bucketName}`);
return;
@ -121,17 +60,20 @@ export const startS3DelWorker = async () => {
}
if (keys) {
addLog.debug(`[S3 delete] delete keys: ${keys.length}`);
await bucket.client.deleteObjectsByMultiKeys({ keys });
await batchRun(keys, async (key) => {
await bucket.removeObject(key);
// Delete parsed
if (!key.includes('-parsed/')) {
const fileParsedPrefix = `${path.dirname(key)}/${path.basename(key, path.extname(key))}-parsed`;
await prefixDel(bucket, fileParsedPrefix);
}
if (key.includes('-parsed/')) return;
const fileParsedPrefix = `${path.dirname(key)}/${path.basename(key, path.extname(key))}-parsed`;
await bucket.client.deleteObjectsByPrefix({ prefix: fileParsedPrefix });
});
}
if (prefix) {
await prefixDel(bucket, prefix);
addLog.info(`[S3 delete] delete prefix: ${prefix}`);
const tasks = [];
const p = limit(() => retryFn(() => bucket.client.deleteObjectsByPrefix({ prefix })));
tasks.push(p);
await Promise.all(tasks);
addLog.info(`[S3 delete] delete prefix: ${prefix} success`);
}
},
{

View File

@ -25,7 +25,7 @@ class S3AvatarSource extends S3PublicBucket {
}) {
const { fileKey } = getFileS3Key.avatar({ teamId, filename });
return this.createPostPresignedUrl(
return this.createPresignedPutUrl(
{ filename, rawKey: fileKey },
{
expiredHours: autoExpired ? 1 : undefined, // 1 Hours

View File

@ -60,21 +60,24 @@ export class S3ChatSource extends S3PrivateBucket {
async createUploadChatFileURL(params: CheckChatFileKeys) {
const { appId, chatId, uId, filename, expiredTime } = ChatFileUploadSchema.parse(params);
const { fileKey } = getFileS3Key.chat({ appId, chatId, uId, filename });
return await this.createPostPresignedUrl(
return await this.createPresignedPutUrl(
{ rawKey: fileKey, filename },
{ expiredHours: expiredTime ? differenceInHours(expiredTime, new Date()) : 24 }
);
}
deleteChatFilesByPrefix(params: DelChatFileByPrefixParams) {
async deleteChatFilesByPrefix(params: DelChatFileByPrefixParams) {
const { appId, chatId, uId } = DelChatFileByPrefixSchema.parse(params);
const prefix = [S3Sources.chat, appId, uId, chatId].filter(Boolean).join('/');
return this.addDeleteJob({ prefix });
await this.addDeleteJob({ prefix });
global.s3BucketMap[S3Buckets.public]?.addDeleteJob({ prefix });
return prefix;
}
deleteChatFileByKey(key: string) {
return this.addDeleteJob({ key });
this.addDeleteJob({ key });
return key;
}
async uploadChatFileByBuffer(params: UploadFileParams) {

View File

@ -1,6 +1,6 @@
import { S3Sources } from '../../type';
import { S3PrivateBucket } from '../../buckets/private';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import streamConsumer from 'node:stream/consumers';
import {
type CreateGetDatasetFileURLParams,
CreateGetDatasetFileURLParamsSchema,
@ -46,7 +46,7 @@ export class S3DatasetSource extends S3PrivateBucket {
async createUploadDatasetFileURL(params: CreateUploadDatasetFileParams) {
const { filename, datasetId } = CreateUploadDatasetFileParamsSchema.parse(params);
const { fileKey } = getFileS3Key.dataset({ datasetId, filename });
return await this.createPostPresignedUrl({ rawKey: fileKey, filename }, { expiredHours: 3 });
return await this.createPresignedPutUrl({ rawKey: fileKey, filename }, { expiredHours: 3 });
}
// 单个键删除
@ -71,13 +71,14 @@ export class S3DatasetSource extends S3PrivateBucket {
}
async getDatasetBase64Image(key: string): Promise<string> {
const [stream, metadata] = await Promise.all([
this.getFileStream(key),
const [downloadResponse, fileMetadata] = await Promise.all([
this.client.downloadObject({ key }),
this.getFileMetadata(key)
]);
const buffer = await this.fileStreamToBuffer(stream);
const buffer = await streamConsumer.buffer(downloadResponse.body);
const base64 = buffer.toString('base64');
return `data:${metadata?.contentType || 'image/jpeg'};base64,${base64}`;
return `data:${fileMetadata?.contentType || 'image/jpeg'};base64,${base64}`;
}
async getDatasetFileRawText(params: GetDatasetFileContentParams) {
@ -95,16 +96,16 @@ export class S3DatasetSource extends S3PrivateBucket {
};
}
const [metadata, stream] = await Promise.all([
const [fileMetadata, downloadResponse] = await Promise.all([
this.getFileMetadata(fileId),
this.getFileStream(fileId)
this.client.downloadObject({ key: fileId })
]);
const extension = metadata?.extension || '';
const filename: string = decodeURIComponent(metadata?.filename || '');
const filename = fileMetadata?.filename || '';
const extension = fileMetadata?.extension || '';
const start = Date.now();
const buffer = await this.fileStreamToBuffer(stream);
const buffer = await streamConsumer.buffer(downloadResponse.body);
addLog.debug('get dataset file buffer', { time: Date.now() - start });
const encoding = detectFileEncoding(buffer);
@ -144,29 +145,20 @@ export class S3DatasetSource extends S3PrivateBucket {
const truncatedFilename = truncateFilename(filename);
const { fileKey: key } = getFileS3Key.dataset({ datasetId, filename: truncatedFilename });
const { stream, size } = (() => {
if ('buffer' in file) {
return {
stream: file.buffer,
size: file.buffer.length
};
}
return {
stream: file.stream,
size: file.size
};
})();
await MongoS3TTL.create({
minioKey: key,
bucketName: this.bucketName,
expiredTime: addHours(new Date(), 3)
});
await this.putObject(key, stream, size, {
'content-type': Mimes[path.extname(truncatedFilename) as keyof typeof Mimes],
'upload-time': new Date().toISOString(),
'origin-filename': encodeURIComponent(truncatedFilename)
await this.client.uploadObject({
key,
body: 'buffer' in file ? file.buffer : file.stream,
contentType: Mimes[path.extname(truncatedFilename) as keyof typeof Mimes],
metadata: {
uploadTime: new Date().toISOString(),
originFilename: encodeURIComponent(truncatedFilename)
}
});
return key;

View File

@ -8,6 +8,7 @@ import { MongoS3TTL } from '../../schema';
import { addMinutes } from 'date-fns';
import { getFileS3Key } from '../../utils';
import { createHash } from 'node:crypto';
import streamConsumer from 'node:stream/consumers';
export class S3RawTextSource extends S3PrivateBucket {
constructor() {
@ -16,10 +17,10 @@ export class S3RawTextSource extends S3PrivateBucket {
// 获取文件元数据
async getFilename(key: string) {
const stat = await this.statObject(key);
if (!stat) return '';
const metadataResponse = await this.client.getObjectMetadata({ key });
if (!metadataResponse) return '';
const filename: string = decodeURIComponent(stat.metaData['origin-filename']);
const filename: string = decodeURIComponent(metadataResponse.metadata.originFilename || '');
return filename;
}
@ -38,10 +39,14 @@ export class S3RawTextSource extends S3PrivateBucket {
expiredTime: addMinutes(new Date(), 20)
});
await this.putObject(key, buffer, buffer.length, {
'content-type': 'text/plain',
'origin-filename': encodeURIComponent(sourceName),
'upload-time': new Date().toISOString()
await this.client.uploadObject({
key,
body: buffer,
contentType: 'text/plain',
metadata: {
originFilename: encodeURIComponent(sourceName),
uploadTime: new Date().toISOString()
}
});
return key;
@ -55,13 +60,16 @@ export class S3RawTextSource extends S3PrivateBucket {
if (!(await this.isObjectExists(key))) return null;
const [stream, filename] = await Promise.all([this.getFileStream(key), this.getFilename(key)]);
const [downloadResponse, fileMetadata] = await Promise.all([
this.client.downloadObject({ key }),
this.getFileMetadata(key)
]);
const buffer = await this.fileStreamToBuffer(stream);
const buffer = await streamConsumer.buffer(downloadResponse.body);
return {
text: buffer.toString('utf-8'),
filename
filename: fileMetadata?.filename || ''
};
}
}

View File

@ -1,5 +1,5 @@
import { z } from 'zod';
import type { defaultS3Options, Mimes } from './constants';
import type { Mimes } from './constants';
import type { S3BaseBucket } from './buckets/base';
export const S3MetadataSchema = z.object({
@ -15,8 +15,6 @@ export type S3Metadata = z.infer<typeof S3MetadataSchema>;
export type ContentType = (typeof Mimes)[keyof typeof Mimes];
export type ExtensionType = keyof typeof Mimes;
export type S3OptionsType = typeof defaultS3Options;
export const S3SourcesSchema = z.enum(['avatar', 'chat', 'dataset', 'temp', 'rawText']);
export const S3Sources = S3SourcesSchema.enum;
export type S3SourceType = z.infer<typeof S3SourcesSchema>;

View File

@ -121,10 +121,14 @@ export async function uploadImage2S3Bucket(
const base64Data = base64Img.split(',')[1] || base64Img;
const buffer = Buffer.from(base64Data, 'base64');
await bucket.putObject(uploadKey, buffer, buffer.length, {
'content-type': mimetype,
'upload-time': new Date().toISOString(),
'origin-filename': encodeURIComponent(filename)
await bucket.client.uploadObject({
key: uploadKey,
body: buffer,
contentType: mimetype,
metadata: {
uploadTime: new Date().toISOString(),
originFilename: encodeURIComponent(filename)
}
});
const now = new Date();
@ -189,16 +193,25 @@ export const getFileS3Key = {
appId,
chatId,
uId,
filename
filename,
isTool = false
}: {
chatId: string;
uId: string;
appId: string;
filename: string;
filename?: string;
isTool?: boolean;
}) => {
const { formatedFilename, extension } = getFormatedFilename(filename);
const basePrefix = [S3Sources.chat, appId, uId, chatId].filter(Boolean).join('/');
if (isTool) {
return {
fileKey: basePrefix,
fileParsedPrefix: ''
};
}
return {
fileKey: [basePrefix, `${formatedFilename}${extension ? `.${extension}` : ''}`].join('/'),
fileParsedPrefix: [basePrefix, `${formatedFilename}-parsed`].join('/')

View File

@ -1,11 +1,11 @@
const systemWhiteList = (() => {
const list: string[] = [];
if (process.env.S3_ENDPOINT) {
list.push(process.env.S3_ENDPOINT);
if (process.env.STORAGE_S3_ENDPOINT) {
list.push(process.env.STORAGE_S3_ENDPOINT);
}
if (process.env.S3_EXTERNAL_BASE_URL) {
if (process.env.STORAGE_EXTERNAL_BASE_URL) {
try {
const urlData = new URL(process.env.S3_EXTERNAL_BASE_URL);
const urlData = new URL(process.env.STORAGE_EXTERNAL_BASE_URL);
list.push(urlData.hostname);
} catch (error) {}
}

View File

@ -171,10 +171,12 @@ export const loadRequestMessages = async ({
const url = await (async () => {
if (item.key) {
try {
return await getS3ChatSource().createGetChatFileURL({
key: item.key,
external: false
});
return (
await getS3ChatSource().createGetChatFileURL({
key: item.key,
external: false
})
).getUrl;
} catch (error) {}
}
return imgUrl;

View File

@ -14,13 +14,15 @@ export const addPreviewUrlToChatItems = async (
async function addToChatflow(item: ChatItemType) {
for await (const value of item.value) {
if (value.type === ChatItemValueTypeEnum.file && value.file && value.file.key) {
value.file.url = await s3ChatSource.createGetChatFileURL({
const { getUrl: url } = await s3ChatSource.createGetChatFileURL({
key: value.file.key,
external: true
});
value.file.url = url;
}
}
}
async function addToWorkflowTool(item: ChatItemType) {
if (item.obj !== ChatRoleEnum.Human || !Array.isArray(item.value)) return;
@ -39,7 +41,7 @@ export const addPreviewUrlToChatItems = async (
for (const file of input.value) {
if (!file.key) continue;
const url = await getS3ChatSource().createGetChatFileURL({
const { getUrl: url } = await getS3ChatSource().createGetChatFileURL({
key: file.key,
external: true
});
@ -85,7 +87,7 @@ export const presignVariablesFileUrls = async ({
val.map(async (item) => {
if (!item.key) return item;
const url = await getS3ChatSource().createGetChatFileURL({
const { getUrl: url } = await getS3ChatSource().createGetChatFileURL({
key: item.key,
external: true
});

View File

@ -22,6 +22,7 @@ import { getNodeErrResponse } from '../utils';
import { splitCombineToolId } from '@fastgpt/global/core/app/tool/utils';
import { getAppVersionById } from '../../../../core/app/version/controller';
import { runHTTPTool } from '../../../app/http';
import { getFileS3Key } from '../../../../common/s3/utils';
type SystemInputConfigType = {
type: SystemToolSecretInputTypeEnum;
@ -53,6 +54,12 @@ export const dispatchRunTool = async (props: RunToolProps): Promise<RunToolRespo
node: { name, avatar, toolConfig, version, catchError }
} = props;
const {
uid: uId,
chatId = '',
runningAppInfo: { id: appId }
} = props;
const systemToolId = toolConfig?.systemTool?.toolId;
let toolInput: Record<string, any> = {};
@ -109,7 +116,8 @@ export const dispatchRunTool = async (props: RunToolProps): Promise<RunToolRespo
},
tool: {
id: formatToolId,
version: version || tool.versionList?.[0]?.value || ''
version: version || tool.versionList?.[0]?.value || '',
prefix: getFileS3Key.chat({ chatId, uId, appId, isTool: true }).fileKey
},
time: variables.cTime
},

View File

@ -139,10 +139,11 @@ export async function dispatchWorkFlow({
// Add preview url to query
...query.map(async (item) => {
if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) return;
item.file.url = await getS3ChatSource().createGetChatFileURL({
const { getUrl: url } = await getS3ChatSource().createGetChatFileURL({
key: item.file.key,
external: true
});
item.file.url = url;
}),
// Remove stopping sign
delAgentRuntimeStopSign({

View File

@ -42,11 +42,12 @@ export const dispatchPluginInput = async (
for (let i = 0; i < val.length; i++) {
const fileItem = val[i];
if (fileItem.key && !fileItem.url) {
val[i].url = await getS3ChatSource().createGetChatFileURL({
const { getUrl: url } = await getS3ChatSource().createGetChatFileURL({
key: fileItem.key,
external: true,
expiredHours: 1
});
val[i].url = url;
}
}
params[key] = val.map((item) => item.url);

View File

@ -3,6 +3,7 @@
"version": "1.0.0",
"type": "module",
"dependencies": {
"@fastgpt-sdk/storage": "workspace:*",
"@fastgpt/global": "workspace:*",
"@maxmind/geoip2-node": "^6.3.4",
"@modelcontextprotocol/sdk": "^1.24.0",
@ -38,6 +39,7 @@
"jsonwebtoken": "^9.0.2",
"lodash": "^4.17.21",
"mammoth": "^1.11.0",
"mime": "^4.1.0",
"minio": "^8.0.5",
"mongoose": "^8.10.1",
"multer": "2.0.2",
@ -52,6 +54,7 @@
"pg": "^8.10.0",
"pino": "^9.7.0",
"pino-opentelemetry-transport": "^1.0.1",
"proxy-agent": "^6.5.0",
"request-ip": "^3.3.0",
"tiktoken": "1.0.17",
"tunnel": "^0.0.6",

View File

@ -202,11 +202,13 @@ export async function authDatasetData({
imageId: datasetData.imageId,
imagePreivewUrl:
datasetData.imageId && isS3ObjectKey(datasetData.imageId, 'dataset')
? await getS3DatasetSource().createGetDatasetFileURL({
key: datasetData.imageId,
expiredHours: 1,
external: true
})
? (
await getS3DatasetSource().createGetDatasetFileURL({
key: datasetData.imageId,
expiredHours: 1,
external: true
})
).getUrl
: undefined,
chunkIndex: datasetData.chunkIndex,
indexes: datasetData.indexes,

View File

@ -0,0 +1 @@
lts/iron

200
packages/storage/README.md Normal file
View File

@ -0,0 +1,200 @@
# @fastgpt-sdk/storage
FastGPT 的对象存储 SDK提供 **统一的、与厂商无关的**存储接口S3/MinIO/OSS/COS 等),用于上传、下载、删除、列举对象以及获取元数据。
> 本包为 ESM`"type": "module"`),并要求 Node.js **>= 20**。
## 安装
```bash
pnpm add @fastgpt-sdk/storage
```
## 快速开始
```ts
import { createStorage } from '@fastgpt-sdk/storage';
import { createWriteStream } from 'node:fs';
const storage = createStorage({
vendor: 'minio',
bucket: 'my-bucket',
region: 'us-east-1',
endpoint: 'http://127.0.0.1:9000',
credentials: {
accessKeyId: process.env.MINIO_ACCESS_KEY ?? '',
secretAccessKey: process.env.MINIO_SECRET_KEY ?? ''
},
// minio 常见配置:若你的服务不支持 virtual-host 访问方式,可打开它
forcePathStyle: true
});
// 1) 确保 bucket 存在(不存在则尝试创建)
await storage.ensureBucket();
// 2) 上传
await storage.uploadObject({
key: 'demo/hello.txt',
body: 'hello fastgpt',
contentType: 'text/plain; charset=utf-8',
metadata: {
app: 'fastgpt',
purpose: 'readme-demo'
}
});
// 3) 下载(流式)
const { body } = await storage.downloadObject({ key: 'demo/hello.txt' });
body.pipe(createWriteStream('/tmp/hello.txt'));
// 4) 删除
await storage.deleteObject({ key: 'demo/hello.txt' });
// 5) 释放资源(部分 adapter 可能是空实现)
await storage.destroy();
```
## 配置IStorageOptions
通过 `vendor` 字段选择适配器(判别联合),不同厂商的配置项在 `IStorageOptions` 上有清晰的类型约束与中文 JSDoc。
### AWS S3
```ts
import { createStorage } from '@fastgpt-sdk/storage';
const storage = createStorage({
vendor: 'aws-s3',
bucket: 'my-bucket',
region: 'ap-northeast-1',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? '',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? ''
}
});
```
### MinIO / 其他 S3 兼容
```ts
import { createStorage } from '@fastgpt-sdk/storage';
const storage = createStorage({
vendor: 'minio',
bucket: 'my-bucket',
region: 'us-east-1',
endpoint: 'http://127.0.0.1:9000',
credentials: {
accessKeyId: process.env.MINIO_ACCESS_KEY ?? '',
secretAccessKey: process.env.MINIO_SECRET_KEY ?? ''
},
forcePathStyle: true
});
```
### 阿里云 OSS
```ts
import { createStorage } from '@fastgpt-sdk/storage';
const storage = createStorage({
vendor: 'oss',
bucket: 'my-bucket',
region: 'oss-cn-hangzhou',
endpoint: process.env.OSS_ENDPOINT, // 视你的部署与 SDK 配置而定
credentials: {
accessKeyId: process.env.OSS_ACCESS_KEY_ID ?? '',
secretAccessKey: process.env.OSS_ACCESS_KEY_SECRET ?? ''
},
cname: false,
internal: false
});
```
### 腾讯云 COS
```ts
import { createStorage } from '@fastgpt-sdk/storage';
const storage = createStorage({
vendor: 'cos',
bucket: 'my-bucket',
region: 'ap-guangzhou',
credentials: {
accessKeyId: process.env.COS_SECRET_ID ?? '',
secretAccessKey: process.env.COS_SECRET_KEY ?? ''
},
protocol: 'https:',
useAccelerate: false
});
```
## APIIStorage
`createStorage(options)` 返回一个实现了 `IStorage` 的实例:
- **`ensureBucket()`**: 确保 bucket 存在(不存在时**可能**尝试创建,取决于 vendor 与权限;部分厂商仅做存在性校验并直接抛错)。
- **`checkObjectExists({ key })`**: 判断对象是否存在。
- **`uploadObject({ key, body, contentType?, contentLength?, contentDisposition?, metadata? })`**: 上传对象。
- **`downloadObject({ key })`**: 下载对象(返回 `Readable`)。
- **`deleteObject({ key })`**: 删除单个对象。
- **`deleteObjectsByMultiKeys({ keys })`**: 按 key 列表批量删除(返回失败 key 列表)。
- **`deleteObjectsByPrefix({ prefix })`**: 按前缀批量删除(高危,务必使用非空 prefix返回失败 key 列表)。
- **`generatePresignedPutUrl({ key, expiredSeconds?, metadata? })`**: 生成 **PUT** 预签名 URL用于前端直传
- **`generatePresignedGetUrl({ key, expiredSeconds? })`**: 生成 **GET** 预签名 URL用于临时授权下载
- **`listObjects({ prefix? })`**: 列出对象 key可按前缀过滤不传则列出整个 bucket 内对象)。
- **`getObjectMetadata({ key })`**: 获取对象元数据。
- **`destroy()`**: 资源清理/连接释放。
> 重要:当前实现状态(以代码为准):
> - `generatePresignedPutUrl`**AWS S3 / MinIO / COS / OSS 已实现**。
> - `generatePresignedGetUrl`:目前各 adapter 仍为 **未实现**(会抛 `Error('Method not implemented.')`)。
### 预签名 PUT 直传示例(浏览器 / 前端)
`generatePresignedPutUrl` 返回的 `metadata` 字段语义更接近“需要带上的 headers”不同厂商前缀不同`x-oss-meta-*` / `x-cos-meta-*`)。
```ts
const { putUrl, metadata } = await storage.generatePresignedPutUrl({
key: 'demo/hello.txt',
expiredSeconds: 600,
metadata: { app: 'fastgpt', purpose: 'direct-upload' }
});
await fetch(putUrl, {
method: 'PUT',
headers: {
// 将 adapter 返回的 headers 带上(若为空对象也没关系)
...metadata,
'content-type': 'text/plain; charset=utf-8'
},
body: 'hello fastgpt'
});
```
## 错误与异常
导出的错误类型:
- **`NoSuchBucketError`**: bucket 不存在(部分 adapter 会用它包装底层错误)。
- **`NoBucketReadPermissionError`**: bucket 无读取权限(部分 adapter 会用它包装底层错误)。
- **`EmptyObjectError`**: 下载时对象为空(例如底层 SDK 返回 `Body` 为空)。
建议你在业务层做分层处理:可恢复错误(重试/提示权限)与不可恢复错误(配置错误/接口未实现)。
## 注意事项
- **按前缀删除是高危操作**`prefix` 必须是非空字符串;强烈建议使用业务隔离前缀(例如 `team/{teamId}/`),避免误删整桶。
- **metadata 厂商差异**:不同厂商对元数据 key 前缀/大小写/可用字符/大小限制不同,建议使用简单 ASCII key并控制总体大小。
- **流式下载/上传**:大文件建议使用 `Readable`,减少内存峰值。
## 开发与构建
```bash
pnpm -C FastGPT/packages/storage dev
pnpm -C FastGPT/packages/storage build
```
发布前会执行 `prepublishOnly` 自动构建产物到 `dist/`

View File

@ -0,0 +1,62 @@
{
"name": "@fastgpt-sdk/storage",
"private": false,
"version": "0.5.1",
"type": "module",
"packageManager": "pnpm@9.15.9",
"description": "FastGPT SDK for object storage",
"author": "FastGPT",
"repository": {
"type": "git",
"url": "https://github.com/labring/FastGPT.git",
"directory": "FastGPT/packages/storage"
},
"homepage": "https://github.com/labring/FastGPT",
"bugs": {
"url": "https://github.com/labring/FastGPT/issues"
},
"publishConfig": {
"access": "public"
},
"engines": {
"node": ">=20"
},
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"files": [
"dist"
],
"sideEffects": false,
"license": "MIT",
"keywords": [
"object-storage",
"storage",
"aws-s3",
"cos",
"minio",
"oss"
],
"scripts": {
"build": "tsdown",
"dev": "tsdown --watch",
"prepublishOnly": "pnpm build"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.948.0",
"@aws-sdk/lib-storage": "^3.948.0",
"@aws-sdk/s3-request-presigner": "^3.952.0",
"ali-oss": "^6.23.0",
"cos-nodejs-sdk-v5": "^2.15.4",
"es-toolkit": "^1.43.0"
},
"devDependencies": {
"@types/ali-oss": "^6.16.13",
"@types/node": "^20",
"tsdown": "^0.17.4",
"typescript": "^5.9.3"
}
}

View File

@ -0,0 +1,426 @@
import {
CopyObjectCommand,
DeleteObjectCommand,
DeleteObjectsCommand,
GetObjectCommand,
HeadBucketCommand,
HeadObjectCommand,
ListObjectsV2Command,
NotFound,
PutObjectCommand,
S3Client
} from '@aws-sdk/client-s3';
import type { IAwsS3CompatibleStorageOptions, IStorage } from '../interface';
import type {
UploadObjectParams,
UploadObjectResult,
DownloadObjectParams,
DownloadObjectResult,
DeleteObjectParams,
DeleteObjectsParams,
DeleteObjectsResult,
PresignedPutUrlParams,
PresignedPutUrlResult,
ListObjectsParams,
ListObjectsResult,
DeleteObjectResult,
GetObjectMetadataParams,
GetObjectMetadataResult,
EnsureBucketResult,
DeleteObjectsByPrefixParams,
StorageObjectKey,
ExistsObjectParams,
ExistsObjectResult,
StorageObjectMetadata,
PresignedGetUrlParams,
PresignedGetUrlResult,
CopyObjectParams,
CopyObjectResult,
GeneratePublicGetUrlParams,
GeneratePublicGetUrlResult
} from '../types';
import { Upload } from '@aws-sdk/lib-storage';
import { EmptyObjectError } from '../errors';
import type { Readable } from 'node:stream';
import { camelCase, chunk, isNotNil, kebabCase } from 'es-toolkit';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS } from '../constants';
export class AwsS3StorageAdapter implements IStorage {
protected readonly client: S3Client;
get bucketName(): string {
return this.options.bucket;
}
constructor(protected readonly options: IAwsS3CompatibleStorageOptions) {
if (options.vendor !== 'aws-s3' && options.vendor !== 'minio') {
throw new Error('Invalid storage vendor');
}
this.client = new S3Client({
region: options.region,
credentials: options.credentials,
endpoint: options.endpoint,
forcePathStyle: options.forcePathStyle,
maxAttempts: options.maxRetries
});
}
async checkObjectExists(params: ExistsObjectParams): Promise<ExistsObjectResult> {
const { key } = params;
let exists = false;
try {
await this.client.send(
new HeadObjectCommand({
Bucket: this.options.bucket,
Key: key
})
);
exists = true;
} catch (error) {
if (error instanceof NotFound) {
exists = false;
} else {
throw error;
}
}
return {
key,
exists,
bucket: this.options.bucket
};
}
async getObjectMetadata(params: GetObjectMetadataParams): Promise<GetObjectMetadataResult> {
const { key } = params;
const result = await this.client.send(
new HeadObjectCommand({
Bucket: this.options.bucket,
Key: key
})
);
let metadata: StorageObjectMetadata = {};
if (result.Metadata) {
for (const [k, v] of Object.entries(result.Metadata)) {
if (!k) continue;
metadata[camelCase(k)] = String(v);
}
}
return {
key,
metadata,
etag: result.ETag,
bucket: this.options.bucket,
contentType: result.ContentType,
contentLength: result.ContentLength
};
}
async ensureBucket(): Promise<EnsureBucketResult> {
await this.client.send(new HeadBucketCommand({ Bucket: this.options.bucket }));
return {
exists: true,
created: false,
bucket: this.options.bucket
};
}
async uploadObject(params: UploadObjectParams): Promise<UploadObjectResult> {
const { key, body, contentType, contentLength, contentDisposition, metadata } = params;
let meta: StorageObjectMetadata = {};
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
meta[kebabCase(k)] = String(v);
}
}
const upload = new Upload({
client: this.client,
params: {
Bucket: this.options.bucket,
Key: key,
Body: body,
ContentType: contentType,
ContentLength: contentLength,
ContentDisposition: contentDisposition,
Metadata: meta
}
});
await upload.done();
return {
key,
bucket: this.options.bucket
};
}
async downloadObject(params: DownloadObjectParams): Promise<DownloadObjectResult> {
const { key } = params;
const result = await this.client.send(
new GetObjectCommand({
Bucket: this.options.bucket,
Key: key
})
);
if (!result.Body) {
throw new EmptyObjectError('Object is undefined');
}
return {
key,
bucket: this.options.bucket,
body: result.Body as Readable
};
}
async deleteObject(params: DeleteObjectParams): Promise<DeleteObjectResult> {
const { key } = params;
await this.client.send(
new DeleteObjectCommand({
Key: key,
Bucket: this.options.bucket
})
);
return {
key,
bucket: this.options.bucket
};
}
async deleteObjectsByMultiKeys(params: DeleteObjectsParams): Promise<DeleteObjectsResult> {
const { keys } = params;
if (keys.length === 0) {
return {
bucket: this.options.bucket,
keys: []
};
}
const chunks = chunk(keys, 1000);
const fails: StorageObjectKey[] = [];
for (const chunk of chunks) {
const result = await this.client.send(
new DeleteObjectsCommand({
Bucket: this.options.bucket,
Delete: {
Objects: chunk.map((key) => ({ Key: key })),
Quiet: true
}
})
);
fails.push(...(result.Errors?.map((error) => error.Key).filter(isNotNil) ?? []));
}
return {
bucket: this.options.bucket,
keys: fails
};
}
async deleteObjectsByPrefix(params: DeleteObjectsByPrefixParams): Promise<DeleteObjectsResult> {
const { prefix } = params;
if (!prefix) {
throw new Error('Prefix is required');
}
let fails: StorageObjectKey[] = [];
let isTruncated = false;
let continuationToken: string | undefined = undefined;
do {
const listResponse = await this.client.send(
new ListObjectsV2Command({
Bucket: this.options.bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
MaxKeys: 1000
})
);
if (!listResponse.Contents || listResponse.Contents.length === 0) {
return {
bucket: this.options.bucket,
keys: []
};
}
const objectsToDelete = listResponse.Contents.map((content) => ({ Key: content.Key }));
const deleteResponse = await this.client.send(
new DeleteObjectsCommand({
Bucket: this.options.bucket,
Delete: {
Objects: objectsToDelete,
Quiet: true
}
})
);
fails.push(...(deleteResponse.Errors?.map((error) => error.Key).filter(isNotNil) ?? []));
isTruncated = listResponse.IsTruncated ?? false;
continuationToken = listResponse.NextContinuationToken as string | undefined;
} while (isTruncated);
return {
bucket: this.options.bucket,
keys: fails
};
}
async generatePresignedPutUrl(params: PresignedPutUrlParams): Promise<PresignedPutUrlResult> {
const { key, expiredSeconds, metadata, contentType } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
// For S3-compatible vendors, metadata is carried by `x-amz-meta-*` headers.
// We return the expected header map so callers can do browser direct-upload with the same metadata.
const meta: Record<string, string> = {};
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
meta[kebabCase(k)] = String(v);
}
}
if (contentType) {
meta['Content-Type'] = contentType;
}
const url = await getSignedUrl(
this.client,
new PutObjectCommand({
Bucket: this.options.bucket,
Key: key,
Metadata: meta
}),
{
expiresIn
}
);
return {
key,
putUrl: url,
bucket: this.options.bucket,
metadata: {
'Content-Type': meta['Content-Type']
}
};
}
async generatePresignedGetUrl(params: PresignedGetUrlParams): Promise<PresignedGetUrlResult> {
const { key, expiredSeconds } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
const url = await getSignedUrl(
this.client,
new GetObjectCommand({
Bucket: this.options.bucket,
Key: key
}),
{
expiresIn
}
);
return {
key,
getUrl: url,
bucket: this.options.bucket
};
}
generatePublicGetUrl(params: GeneratePublicGetUrlParams): GeneratePublicGetUrlResult {
const { key } = params;
let url: string;
if (this.options.forcePathStyle) {
url = `${this.options.endpoint}/${this.options.bucket}/${key}`;
} else {
const endpoint = new URL(this.options.endpoint);
url = `${endpoint.protocol}//${this.options.bucket}.${endpoint.host}/${key}`;
}
return {
key,
publicGetUrl: url,
bucket: this.options.bucket
};
}
async listObjects(params: ListObjectsParams): Promise<ListObjectsResult> {
const { prefix } = params;
let keys: StorageObjectKey[] = [];
let isTruncated = false;
let continuationToken: string | undefined = undefined;
do {
const result = await this.client.send(
new ListObjectsV2Command({
Bucket: this.options.bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
MaxKeys: 1000
})
);
if (!result.Contents || result.Contents.length === 0) {
return {
bucket: this.options.bucket,
keys
};
}
keys = keys.concat(result.Contents.map((content) => content.Key).filter(isNotNil));
isTruncated = result.IsTruncated ?? false;
continuationToken = result.NextContinuationToken as string | undefined;
} while (isTruncated);
return {
bucket: this.options.bucket,
keys
};
}
async copyObjectInSelfBucket(params: CopyObjectParams): Promise<CopyObjectResult> {
const { sourceKey, targetKey } = params;
await this.client.send(
new CopyObjectCommand({
Bucket: this.options.bucket,
CopySource: `${this.options.bucket}/${sourceKey}`,
Key: targetKey
})
);
return {
bucket: this.options.bucket,
sourceKey,
targetKey
};
}
async destroy(): Promise<void> {
this.client.destroy();
}
}

View File

@ -0,0 +1,501 @@
import COS from 'cos-nodejs-sdk-v5';
import type { ICosStorageOptions, IStorage } from '../interface';
import type {
UploadObjectParams,
UploadObjectResult,
DownloadObjectParams,
DownloadObjectResult,
DeleteObjectParams,
DeleteObjectsParams,
DeleteObjectsResult,
PresignedPutUrlParams,
PresignedPutUrlResult,
ListObjectsParams,
ListObjectsResult,
DeleteObjectResult,
GetObjectMetadataParams,
GetObjectMetadataResult,
EnsureBucketResult,
DeleteObjectsByPrefixParams,
StorageObjectKey,
ExistsObjectParams,
ExistsObjectResult,
StorageObjectMetadata,
PresignedGetUrlParams,
PresignedGetUrlResult,
CopyObjectParams,
CopyObjectResult,
GeneratePublicGetUrlParams,
GeneratePublicGetUrlResult
} from '../types';
import { PassThrough } from 'node:stream';
import { camelCase, isError, isNotNil, kebabCase } from 'es-toolkit';
import { DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS } from '../constants';
export class CosStorageAdapter implements IStorage {
protected readonly client: COS;
get bucketName(): string {
return this.options.bucket;
}
constructor(protected readonly options: ICosStorageOptions) {
if (options.vendor !== 'cos') {
throw new Error('Invalid storage vendor');
}
this.client = new COS({
SecretId: options.credentials.accessKeyId,
SecretKey: options.credentials.secretAccessKey,
UseAccelerate: options.useAccelerate,
Protocol: options.protocol,
Domain: options.domain
});
}
async checkObjectExists(params: ExistsObjectParams): Promise<ExistsObjectResult> {
const { key } = params;
let exists = false;
await new Promise<void>((resolve, reject) => {
this.client.headObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: key
},
function (err, _data) {
if (err && err.statusCode === 404) {
exists = false;
return resolve();
}
if (err) {
return reject(err);
}
exists = true;
resolve();
}
);
});
return {
key,
exists,
bucket: this.options.bucket
};
}
async getObjectMetadata(params: GetObjectMetadataParams): Promise<GetObjectMetadataResult> {
const { key } = params;
const result = await new Promise<COS.HeadObjectResult>((resolve, reject) => {
this.client.headObject(
{
Key: key,
Bucket: this.options.bucket,
Region: this.options.region
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
let metadata: StorageObjectMetadata = {};
if (result.headers) {
Object.entries(result.headers).forEach(([key, val]) => {
if (key.startsWith('x-cos-meta-')) {
metadata[camelCase(key.replace('x-cos-meta-', ''))] = String(val);
}
});
}
return {
metadata,
key,
etag: result.ETag,
bucket: this.options.bucket,
contentType: result.headers?.['content-type'],
contentLength: result.headers?.['content-length']
? Number(result.headers['content-length'])
: undefined
};
}
async ensureBucket(): Promise<EnsureBucketResult> {
await new Promise<COS.HeadBucketResult>((resolve, reject) => {
this.client.headBucket(
{
Bucket: this.options.bucket,
Region: this.options.region
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
return {
exists: true,
created: false,
bucket: this.options.bucket
};
}
async uploadObject(params: UploadObjectParams): Promise<UploadObjectResult> {
const { key, body, contentType, contentLength, contentDisposition, metadata } = params;
const headers: Record<string, string> = {};
if (contentDisposition) headers['Content-Disposition'] = contentDisposition;
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
headers[`x-cos-meta-${kebabCase(k)}`] = String(v);
}
}
await new Promise<COS.PutObjectResult>((resolve, reject) => {
this.client.putObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: key,
Body: body,
ContentType: contentType,
ContentLength: contentLength,
Headers: Object.keys(headers).length ? headers : undefined
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
return {
key,
bucket: this.options.bucket
};
}
async downloadObject(params: DownloadObjectParams): Promise<DownloadObjectResult> {
const passThrough = new PassThrough();
this.client.getObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: params.key,
Output: passThrough
},
function (err, _data) {
if (err) {
passThrough.destroy(isError(err.error) ? err.error : new Error(err.message));
}
}
);
return {
bucket: this.options.bucket,
key: params.key,
body: passThrough
};
}
async deleteObject(params: DeleteObjectParams): Promise<DeleteObjectResult> {
const { key } = params;
await new Promise<COS.DeleteObjectResult>((resolve, reject) => {
this.client.deleteObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: key
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
return {
key,
bucket: this.options.bucket
};
}
async deleteObjectsByMultiKeys(params: DeleteObjectsParams): Promise<DeleteObjectsResult> {
const { keys } = params;
const result = await new Promise<COS.DeleteMultipleObjectResult>((resolve, reject) => {
this.client.deleteMultipleObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Objects: keys.map((key) => ({ Key: key }))
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
return {
keys: result.Error.map((e) => e.Key).filter(isNotNil),
bucket: this.options.bucket
};
}
async deleteObjectsByPrefix(params: DeleteObjectsByPrefixParams): Promise<DeleteObjectsResult> {
const { prefix } = params;
if (!prefix) {
throw new Error('Prefix is required');
}
const fails: StorageObjectKey[] = [];
let marker: string | undefined = undefined;
await new Promise<void>((resolve, reject) => {
const handler = () => {
this.client.getBucket(
{
Bucket: this.options.bucket,
Region: this.options.region,
Prefix: prefix,
MaxKeys: 1000,
Marker: marker
},
(listErr, listData) => {
if (listErr) {
return reject(listErr);
}
if (!listData.Contents || listData.Contents.length === 0) {
return resolve();
}
const objectsToDelete = listData.Contents.map((content) => ({ Key: content.Key }));
this.client.deleteMultipleObject(
{
Bucket: this.options.bucket,
Region: this.options.region,
Objects: objectsToDelete
},
function (deleteErr, deleteData) {
if (deleteErr) {
fails.push(...objectsToDelete.map((content) => content.Key));
if (listData.IsTruncated === 'true') {
marker = listData.NextMarker;
return handler();
}
return resolve();
}
fails.push(...deleteData.Error.map((e) => e.Key).filter(isNotNil));
if (listData.IsTruncated === 'true') {
marker = listData.NextMarker;
return handler();
}
resolve();
}
);
}
);
};
handler();
});
return {
bucket: this.options.bucket,
keys: fails
};
}
async generatePresignedPutUrl(params: PresignedPutUrlParams): Promise<PresignedPutUrlResult> {
const { key, expiredSeconds, metadata, contentType } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
const meta: Record<string, string> = {};
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
meta[`x-cos-meta-${kebabCase(k)}`] = String(v);
}
}
if (contentType) {
meta['Content-Type'] = contentType;
}
const url = await new Promise<string>((resolve, reject) => {
this.client.getObjectUrl(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: key,
Expires: expiresIn,
Sign: true,
Method: 'PUT'
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data.Url);
}
);
});
return {
key,
putUrl: url,
bucket: this.options.bucket,
metadata: meta
};
}
async generatePresignedGetUrl(params: PresignedGetUrlParams): Promise<PresignedGetUrlResult> {
const { key, expiredSeconds } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
const url = await new Promise<string>((resolve, reject) => {
this.client.getObjectUrl(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: key,
Expires: expiresIn,
Sign: true,
Method: 'GET'
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data.Url);
}
);
});
return {
key,
getUrl: url,
bucket: this.options.bucket
};
}
generatePublicGetUrl(params: GeneratePublicGetUrlParams): GeneratePublicGetUrlResult {
const { key } = params;
let url: string;
if (this.options.domain) {
url = `${this.options.protocol}//${this.options.domain}/${key}`;
} else {
url = `${this.options.protocol}//${this.options.bucket}.cos.${this.options.region}.myqcloud.com/${key}`;
}
return {
key,
publicGetUrl: url,
bucket: this.options.bucket
};
}
async listObjects(params: ListObjectsParams): Promise<ListObjectsResult> {
const { prefix } = params;
let keys: StorageObjectKey[] = [];
let marker: string | undefined = undefined;
await new Promise<void>((resolve, reject) => {
const handler = () => {
this.client.getBucket(
{
Bucket: this.options.bucket,
Region: this.options.region,
Prefix: prefix,
Marker: marker,
MaxKeys: 1000
},
function (err, data) {
if (err) {
return reject(err);
}
keys = keys.concat(data.Contents?.map((content) => content.Key).filter(isNotNil) ?? []);
if (data.IsTruncated === 'true') {
marker = data.NextMarker;
return handler();
}
resolve();
}
);
};
handler();
});
return {
keys,
bucket: this.options.bucket
};
}
async copyObjectInSelfBucket(params: CopyObjectParams): Promise<CopyObjectResult> {
const { sourceKey, targetKey } = params;
await new Promise<COS.SliceCopyFileResult>((resolve, reject) => {
const copySource = `${this.options.bucket}.cos.${this.options.region}.myqcloud.com/${sourceKey}`;
this.client.sliceCopyFile(
{
Bucket: this.options.bucket,
Region: this.options.region,
Key: targetKey,
CopySource: copySource
},
function (err, data) {
if (err) {
return reject(err);
}
resolve(data);
}
);
});
return {
bucket: this.options.bucket,
sourceKey,
targetKey
};
}
async destroy(): Promise<void> {}
}

View File

@ -0,0 +1,60 @@
import { AwsS3StorageAdapter } from './aws-s3.adapter';
import type { IAwsS3CompatibleStorageOptions, IStorage } from '../interface';
import type { EnsureBucketResult } from '../types';
import { CreateBucketCommand, NotFound, PutBucketPolicyCommand } from '@aws-sdk/client-s3';
/**
*
* - forcePathStyle 使 path style URLs
* - MinIO self-hosted
*/
export class MinioStorageAdapter extends AwsS3StorageAdapter implements IStorage {
constructor(protected readonly options: IAwsS3CompatibleStorageOptions) {
if (options.vendor !== 'minio') {
throw new Error('Invalid storage vendor');
}
options.forcePathStyle = true;
super(options);
}
async ensureBucket(): Promise<EnsureBucketResult> {
try {
return await super.ensureBucket();
} catch (error) {
if (!(error instanceof NotFound)) {
throw error;
}
await this.client.send(new CreateBucketCommand({ Bucket: this.options.bucket }));
return {
exists: false,
created: true,
bucket: this.options.bucket
};
}
}
async ensurePublicBucketPolicy(): Promise<void> {
const policy = {
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Principal: '*',
Action: ['s3:GetObject'],
Resource: [`arn:aws:s3:::${this.options.bucket}/*`]
}
]
};
await this.client.send(
new PutBucketPolicyCommand({
Bucket: this.options.bucket,
Policy: JSON.stringify(policy)
})
);
}
}

View File

@ -0,0 +1,357 @@
import OSS from 'ali-oss';
import type { IOssStorageOptions, IStorage } from '../interface';
import type {
UploadObjectParams,
UploadObjectResult,
DownloadObjectParams,
DownloadObjectResult,
DeleteObjectParams,
DeleteObjectsParams,
DeleteObjectsResult,
PresignedPutUrlParams,
PresignedPutUrlResult,
ListObjectsParams,
ListObjectsResult,
DeleteObjectResult,
GetObjectMetadataParams,
GetObjectMetadataResult,
EnsureBucketResult,
DeleteObjectsByPrefixParams,
StorageObjectKey,
ExistsObjectParams,
ExistsObjectResult,
StorageObjectMetadata,
PresignedGetUrlParams,
PresignedGetUrlResult,
CopyObjectParams,
CopyObjectResult,
GeneratePublicGetUrlParams,
GeneratePublicGetUrlResult
} from '../types';
import type { Readable } from 'node:stream';
import { camelCase, difference, kebabCase } from 'es-toolkit';
import { DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS } from '../constants';
export class OosStorageAdapter implements IStorage {
protected readonly client: OSS;
constructor(protected readonly options: IOssStorageOptions) {
if (options.vendor !== 'oss') {
throw new Error('Invalid storage vendor');
}
this.client = new OSS({
accessKeyId: options.credentials.accessKeyId,
accessKeySecret: options.credentials.secretAccessKey,
region: options.region,
endpoint: options.endpoint,
bucket: options.bucket,
cname: options.cname,
internal: options.internal,
secure: options.secure,
// @ts-expect-error ali-oss SDK 类型未定义但存在此属性
enableProxy: options.proxy ? true : false
});
}
get bucketName(): string {
return this.options.bucket;
}
async checkObjectExists(params: ExistsObjectParams): Promise<ExistsObjectResult> {
const { key } = params;
let exists = false;
try {
await this.client.head(key);
exists = true;
} catch (error: any) {
if (error?.code === 'NoSuchKey') {
exists = false;
} else {
throw error;
}
}
return {
key,
exists,
bucket: this.options.bucket
};
}
async getObjectMetadata(params: GetObjectMetadataParams): Promise<GetObjectMetadataResult> {
const { key } = params;
const result = await this.client.head(key);
let metadata: StorageObjectMetadata = {};
if (result.meta) {
for (const [k, v] of Object.entries(result.meta)) {
if (!k) continue;
metadata[camelCase(k)] = String(v);
}
}
const headers = result.res.headers as Record<string, string>;
return {
key,
metadata,
etag: result.meta?.etag as string,
bucket: this.options.bucket,
contentType: headers['content-type'],
contentLength: headers['content-length'] ? Number(headers['content-length']) : undefined
};
}
async ensureBucket(): Promise<EnsureBucketResult> {
await this.client.getBucketInfo(this.options.bucket);
return {
exists: true,
created: false,
bucket: this.options.bucket
};
}
async uploadObject(params: UploadObjectParams): Promise<UploadObjectResult> {
const { key, body, contentType, contentLength, contentDisposition, metadata } = params;
const headers: Record<string, any> = {
'x-oss-storage-class': 'Standard',
'x-oss-forbid-overwrite': 'false'
};
if (contentType) headers['Content-Type'] = contentType;
if (contentLength !== undefined) headers['Content-Length'] = String(contentLength);
if (contentDisposition) headers['Content-Disposition'] = contentDisposition;
let meta = {} as StorageObjectMetadata & OSS.UserMeta;
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
meta[kebabCase(k)] = String(v);
}
}
await this.client.put(key, body, {
headers,
mime: contentType,
meta
});
return {
key,
bucket: this.options.bucket
};
}
async downloadObject(params: DownloadObjectParams): Promise<DownloadObjectResult> {
const { key } = params;
const result = await this.client.getStream(key);
return {
key,
bucket: this.options.bucket,
body: result.stream as Readable
};
}
async deleteObject(params: DeleteObjectParams): Promise<DeleteObjectResult> {
const { key } = params;
await this.client.delete(key);
return {
bucket: this.options.bucket,
key
};
}
async deleteObjectsByMultiKeys(params: DeleteObjectsParams): Promise<DeleteObjectsResult> {
const { keys } = params;
const result = await this.client.deleteMulti(keys, { quiet: true });
return {
bucket: this.options.bucket,
keys: difference(keys, result.deleted ?? [])
};
}
async deleteObjectsByPrefix(params: DeleteObjectsByPrefixParams): Promise<DeleteObjectsResult> {
const { prefix } = params;
if (!prefix) {
throw new Error('Prefix is required');
}
const fails: StorageObjectKey[] = [];
let marker: string | undefined = undefined;
let isTruncated = false;
do {
const listResponse = await this.client.list(
{
prefix,
'max-keys': 1000,
marker
},
{
timeout: 60000
}
);
if (!listResponse.objects || listResponse.objects.length === 0) {
return {
bucket: this.options.bucket,
keys: []
};
}
const objectsToDelete = listResponse.objects.map((object) => object.name);
const deleteResponse = await this.deleteObjectsByMultiKeys({ keys: objectsToDelete });
fails.push(...deleteResponse.keys);
isTruncated = listResponse.isTruncated ?? false;
marker = listResponse.nextMarker;
} while (isTruncated);
return {
bucket: this.options.bucket,
keys: fails
};
}
async generatePresignedPutUrl(params: PresignedPutUrlParams): Promise<PresignedPutUrlResult> {
const { key, expiredSeconds, metadata, contentType } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
const headersToSign: Record<string, string> = {};
if (metadata) {
for (const [k, v] of Object.entries(metadata)) {
if (!k) continue;
headersToSign[`x-oss-meta-${kebabCase(k)}`] = String(v);
}
}
if (contentType) {
headersToSign['Content-Type'] = contentType;
}
// @ts-expect-error ali-oss SDK 类型未定义但存在此方法
// @see https://github.com/ali-sdk/ali-oss?tab=readme-ov-file#signatureurlv4method-expires-request-objectname-additionalheaders
const url = await this.client.signatureUrlV4(
'PUT',
expiresIn,
{
headers: {
...headersToSign
}
},
key
);
return {
key,
putUrl: url,
bucket: this.options.bucket,
metadata: headersToSign
};
}
async generatePresignedGetUrl(params: PresignedGetUrlParams): Promise<PresignedGetUrlResult> {
const { key, expiredSeconds } = params;
const expiresIn = expiredSeconds ? expiredSeconds : DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS;
const url = this.client.signatureUrl(key, {
method: 'GET',
expires: expiresIn
});
return {
key,
getUrl: url,
bucket: this.options.bucket
};
}
generatePublicGetUrl(params: GeneratePublicGetUrlParams): GeneratePublicGetUrlResult {
const { key } = params;
let protocol = 'https:';
if (!this.options.secure) {
protocol = 'http:';
}
let url: string;
if (this.options.cname) {
url = `${protocol}//${this.options.endpoint}/${key}`;
} else {
url = `${protocol}//${this.options.bucket}.${this.options.region}.aliyuncs.com/${key}`;
}
return {
key,
publicGetUrl: url,
bucket: this.options.bucket
};
}
async listObjects(params: ListObjectsParams): Promise<ListObjectsResult> {
const { prefix } = params;
let keys: StorageObjectKey[] = [];
let marker: string | undefined = undefined;
let isTruncated = false;
do {
const listResponse = await this.client.list(
{
prefix,
'max-keys': 1000,
marker
},
{
timeout: 60000
}
);
if (!listResponse.objects || listResponse.objects.length === 0) {
return {
bucket: this.options.bucket,
keys: []
};
}
keys = keys.concat(listResponse.objects.map((object) => object.name));
isTruncated = listResponse.isTruncated ?? false;
marker = listResponse.nextMarker;
} while (isTruncated);
return {
keys,
bucket: this.options.bucket
};
}
async copyObjectInSelfBucket(params: CopyObjectParams): Promise<CopyObjectResult> {
const { sourceKey, targetKey } = params;
await this.client.copy(sourceKey, targetKey);
return {
bucket: this.options.bucket,
sourceKey,
targetKey
};
}
async destroy(): Promise<void> {}
}
// Backward compatible export name fix: OSS adapter (typo alias).
export { OosStorageAdapter as OssStorageAdapter };

View File

@ -0,0 +1,4 @@
/**
* URL
*/
export const DEFAULT_PRESIGNED_URL_EXPIRED_SECONDS = 1800;

View File

@ -0,0 +1,20 @@
export class NoSuchBucketError extends Error {
constructor(message: string) {
super(message);
this.name = 'NoSuchBucketError';
}
}
export class NoBucketReadPermissionError extends Error {
constructor(message: string) {
super(message);
this.name = 'NoBucketReadPermissionError';
}
}
export class EmptyObjectError extends Error {
constructor(message: string) {
super(message);
this.name = 'EmptyObjectError';
}
}

View File

@ -0,0 +1,29 @@
import { AwsS3StorageAdapter } from './adapters/aws-s3.adapter';
import { CosStorageAdapter } from './adapters/cos.adapter';
import { MinioStorageAdapter } from './adapters/minio.adapter';
import { OssStorageAdapter } from './adapters/oss.adapter';
import type { IStorage, IStorageOptions } from './interface';
export function createStorage(options: IStorageOptions): IStorage {
switch (options.vendor) {
case 'aws-s3': {
return new AwsS3StorageAdapter(options);
}
case 'oss': {
return new OssStorageAdapter(options);
}
case 'cos': {
return new CosStorageAdapter(options);
}
case 'minio': {
return new MinioStorageAdapter(options);
}
default: {
throw new Error(`Unsupported storage vendor: ${String((options as any)?.vendor)}`);
}
}
}

View File

@ -0,0 +1,40 @@
export { createStorage } from './factory';
export type {
IStorage,
IStorageOptions,
IAwsS3CompatibleStorageOptions,
IOssStorageOptions,
ICosStorageOptions,
ICommonStorageOptions
} from './interface';
export type {
StorageBucketName,
StorageObjectKey,
StorageObjectMetadata,
StorageUploadBody,
EnsureBucketResult,
ExistsObjectParams,
ExistsObjectResult,
UploadObjectParams,
UploadObjectResult,
DownloadObjectParams,
DownloadObjectResult,
DeleteObjectParams,
DeleteObjectResult,
DeleteObjectsParams,
DeleteObjectsResult,
DeleteObjectsByPrefixParams,
PresignedPutUrlParams,
PresignedPutUrlResult,
PresignedGetUrlParams,
PresignedGetUrlResult,
ListObjectsParams,
ListObjectsResult,
GetObjectMetadataParams,
GetObjectMetadataResult
} from './types';
export { NoSuchBucketError, NoBucketReadPermissionError, EmptyObjectError } from './errors';
export { AwsS3StorageAdapter } from './adapters/aws-s3.adapter';
export { CosStorageAdapter } from './adapters/cos.adapter';
export { MinioStorageAdapter } from './adapters/minio.adapter';
export { OosStorageAdapter } from './adapters/oss.adapter';

View File

@ -0,0 +1,381 @@
import type {
DeleteObjectParams,
DeleteObjectResult,
DeleteObjectsParams,
DeleteObjectsResult,
DeleteObjectsByPrefixParams,
DownloadObjectParams,
DownloadObjectResult,
EnsureBucketResult,
GetObjectMetadataParams,
GetObjectMetadataResult,
ListObjectsParams,
ListObjectsResult,
PresignedPutUrlParams,
PresignedPutUrlResult,
UploadObjectParams,
UploadObjectResult,
ExistsObjectParams,
ExistsObjectResult,
PresignedGetUrlParams,
PresignedGetUrlResult,
CopyObjectParams,
CopyObjectResult,
GeneratePublicGetUrlParams,
GeneratePublicGetUrlResult
} from './types';
/**
*
*
*
* - S3/OSS/COS/MinIO
* - `bucket` / `region` / `credentials`
* - `endpoint` S3 /
*
*
* - `region` `region` `location` `region`
* adapter
*/
export interface ICommonStorageOptions {
/**
* Bucket
*
* - 使//线
* - bucket bucket
*/
bucket: string;
/**
* Region / Location
*
* - AWS: 例如 `ap-northeast-1`
* - COS: 例如 `ap-guangzhou`
* - OSS: 例如 `oss-cn-hangzhou`
*/
region: string;
/**
* 访AK/SK
*
*
* - `accessKeyId` / `secretAccessKey`
* - /
*/
credentials: {
/** AccessKeyId / SecretId / AK */
accessKeyId: string;
/** SecretAccessKey / SecretKey / SK */
secretAccessKey: string;
};
}
/**
* AWS S3
*
*
* - AWS S3
* - MinIO S3
*
*
* - `vendor` discriminated union便
*/
export interface IAwsS3CompatibleStorageOptions extends ICommonStorageOptions {
/**
* S3
*
* - `aws-s3`: AWS S3
* - `minio`: MinIOS3
*/
vendor: 'aws-s3' | 'minio';
/**
* Endpoint
*
*
* - MinIO / S3 `http(s)://host:port`
* -
*
* adapter
*/
endpoint: string;
/**
* 使 Path-Style 访`/{bucket}/{key}`
*
*
* - S3 virtual-hosted-style`{bucket}.endpoint/{key}` path-style`endpoint/{bucket}/{key}`
* - / virtual-hosted-style
*/
forcePathStyle?: boolean;
/**
*
*
*
* - adapter 使 SDK adapter
* - SDK 使
*/
maxRetries?: number;
}
/**
* OSS
*/
export interface IOssStorageOptions extends ICommonStorageOptions {
/** 存储厂商标识OSS。 */
vendor: 'oss';
/**
* 使CNAME
*
* - endpoint/host
* - OSS SDK adapter
*/
cname?: boolean;
/**
* Endpoint
*
*
* - MinIO / S3 `http(s)://host:port`
* -
*
* adapter
*/
endpoint?: string;
/**
* 使 HTTPS
*
*
* - 使 HTTPS
* - false使 HTTP
*/
secure?: boolean;
/**
*
*/
enableProxy?: boolean;
/**
* internal endpoint
*
* -
* - trueadapter internal endpoint
*/
internal?: boolean;
}
/**
* COS
*/
export interface ICosStorageOptions extends ICommonStorageOptions {
/** 存储厂商标识COS。 */
vendor: 'cos';
/**
*
*
* - `https:`
* - /使 `http:`
*/
protocol?: 'http:' | 'https:';
/**
*
*
*
* - COS /
* - endpoint/ adapter
*/
useAccelerate?: boolean;
/**
*
*
*
* - `https://{bucket}.cos.${region}.myqcloud.com`
* -
*/
domain?: string;
/**
*
*
*
* - HTTP 访 `http://127.0.0.1:8080`
*/
proxy?: string;
}
/**
* BOS
*
*
* - `IStorageOptions` /
* - 便
*/
export interface IBosStorageOptions extends ICommonStorageOptions {
/** 存储厂商标识BOS。 */
vendor: 'bos';
}
/**
*
*
*
* - `vendor` adapter
*
*
* - `aws-s3`/`minio`/`oss`/`cos`
*/
export type IStorageOptions =
| IAwsS3CompatibleStorageOptions
| IOssStorageOptions
| ICosStorageOptions;
/**
* vendor-agnostic
*
*
* - `key` bucket `a/b/c.txt`
* - 便
*/
export interface IStorage {
bucketName: string;
/**
* **** vendor
*
*
* - `exists`: bucket
* - `created`: bucket
*
*
* - OSS/COS / bucket//
* bucket SDK
*/
ensureBucket(): Promise<EnsureBucketResult>;
/**
*
*
*
* - `exists`:
*/
checkObjectExists(params: ExistsObjectParams): Promise<ExistsObjectResult>;
/**
* bucket
*
*
* - Buffer/Readable
* - string
*
*
* - `contentType`/`contentDisposition`/`metadata` HTTP adapter
*/
uploadObject(params: UploadObjectParams): Promise<UploadObjectResult>;
/**
* Node.js `Readable`
*
*
* -
* - /
*/
downloadObject(params: DownloadObjectParams): Promise<DownloadObjectResult>;
/**
* key
*
*
* - adapter
*/
deleteObject(params: DeleteObjectParams): Promise<DeleteObjectResult>;
/**
* key
*
*
* - adapter
* - `deleted` / key
*/
deleteObjectsByMultiKeys(params: DeleteObjectsParams): Promise<DeleteObjectsResult>;
/**
*
*
*
* - `prefix` `team/{teamId}/` bucket
* - adapter list delete
*
*
* - 使 `this.listObjects` `this.deleteObjectsByMultiKeys`
*
*/
deleteObjectsByPrefix(params: DeleteObjectsByPrefixParams): Promise<DeleteObjectsResult>;
/**
* URLPresigned URL
*
*
* - PUT
*
*
* - header adapter SDK
* - `metadata` headers `x-oss-meta-*` / `x-cos-meta-*`
* 沿 `metadata`
*/
generatePresignedPutUrl(params: PresignedPutUrlParams): Promise<PresignedPutUrlResult>;
/**
* URLPresigned URL
*
*
* - GET
*/
generatePresignedGetUrl(params: PresignedGetUrlParams): Promise<PresignedGetUrlResult>;
/**
* 访 URL
*/
generatePublicGetUrl(params: GeneratePublicGetUrlParams): GeneratePublicGetUrlResult;
/**
* key
*
*
* - key /marker/continuationToken
* `ListObjectsParams/Result`
* - `prefix` bucket /
* - listObjects + deleteObjectsByMultiKeys 1000 list
* 使 `deleteObjectsByPrefix`adapter
*/
listObjects(params: ListObjectsParams): Promise<ListObjectsResult>;
/**
*
*/
copyObjectInSelfBucket(params: CopyObjectParams): Promise<CopyObjectResult>;
/**
* Metadata
*
*
* - metadata adapter
* - metadata key 使 ASCII key
*/
getObjectMetadata(params: GetObjectMetadataParams): Promise<GetObjectMetadataResult>;
/**
* /
*
*
* - SDK 退
* - adapter resolved Promise
*/
destroy(): Promise<void>;
}

View File

@ -0,0 +1,388 @@
import type { Readable } from 'node:stream';
/**
* S3/MinIO/OSS/COS/...
*
*
* - **** SDK
* - **Node ** DOM `Blob` / `ReadableStream`
* - ****ACL adapter
*/
/**
* Bucket name
*
*
* - string
* -
*/
export type StorageBucketName = string;
/**
* keyObject key / object path
*
*
* - bucket
* - `a/b/c.txt` `/`
*/
export type StorageObjectKey = string;
/**
* Metadata
*
*
* - adapter
* - key/value 使 `Record<string, string>` adapter /
*
*
* - key 使 ASCII 线/线
*/
export type StorageObjectMetadata = Record<string, string>;
/**
* bodyNode.js
*
* - `Buffer`: /
* - `string`: UTF-8
* - `Readable`:
*
*
* - `Blob` tsconfig
*/
export type StorageUploadBody = Buffer | string | Readable;
/**
* `ensureBucket`
*/
export type EnsureBucketResult = {
/** 调用前 bucket 是否已经存在。 */
exists: boolean;
/** 本次调用是否创建了 bucket存在则为 false。 */
created: boolean;
/** bucket 名称(回显)。 */
bucket: StorageBucketName;
};
/**
*
*/
export type UploadObjectParams = {
/** 对象 key。 */
key: StorageObjectKey;
/** 上传内容。 */
body: StorageUploadBody;
/**
* MIME Content-Type
*
* `image/png``application/pdf``text/plain; charset=utf-8`
*/
contentType?: string;
/**
*
*
*
* - SDK/ SDK chunked
*/
contentLength?: number;
/**
* Content-Disposition
*
*
* - `inline`
* - `attachment; filename="report.pdf"`
*/
contentDisposition?: string;
/**
* key/value
*
*
* - 使 header / `x-amz-meta-` adapter
* - key
*/
metadata?: StorageObjectMetadata;
};
/**
*
*
*
* - `bucket` `key` ETag//
*
*/
export type UploadObjectResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*/
export type DownloadObjectParams = {
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*/
export type DownloadObjectResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
/**
* Node.js
*
* 使
* - pipe HTTP
* `body.pipe(fs.createWriteStream(...))`
*/
body: Readable;
};
/**
*
*/
export type DeleteObjectParams = {
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*/
export type DeleteObjectResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
};
/**
* key
*/
export type DeleteObjectsParams = {
/** 要删除的对象 key 列表。 */
keys: StorageObjectKey[];
};
/**
*
*/
export type DeleteObjectsResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 删除失败的对象 key 列表。 */
keys: StorageObjectKey[];
};
/**
* URL
*/
export type PresignedPutUrlParams = {
/** 对象 key。 */
key: StorageObjectKey;
/** 过期时间(秒),可选,默认 1800 秒。 */
expiredSeconds?: number;
/**
* MIME Content-Type
*
* `image/png``application/pdf``text/plain; charset=utf-8`
*/
contentType?: string;
/**
* key/value
*
*
* - 使 header / `x-amz-meta-` adapter
* - key
*/
metadata?: StorageObjectMetadata;
};
/**
* URL
*/
export type PresignedPutUrlResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
/** 可直接访问的临时 URL。 */
putUrl: string;
/**
* PUT header
*
*
* - header S3 `x-amz-meta-*`COS `x-cos-meta-*`OSS `x-oss-meta-*`
* - 使 adapter header key/value
*
*
* - 沿 `metadata`headers
*/
metadata: Record<string, string>;
};
/**
* URL
*/
export type PresignedGetUrlParams = {
/** 对象 key。 */
key: StorageObjectKey;
/** 过期时间(秒),可选,默认 1800 秒。 */
expiredSeconds?: number;
};
/**
* URL
*/
export type PresignedGetUrlResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
/** 可直接访问的临时 URL。 */
getUrl: string;
};
/**
* 访 URL
*/
export type GeneratePublicGetUrlParams = {
/** 对象 key。 */
key: StorageObjectKey;
};
/**
* 访 URL
*/
export type GeneratePublicGetUrlResult = {
/** 可直接访问的公共 URL。 */
publicGetUrl: string;
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*
*
* - adapter
*/
export type ListObjectsParams = {
/**
* prefix
*
* `team/123/` key
*/
prefix?: string;
};
/**
* key
*/
export type ListObjectsResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key 列表。 */
keys: StorageObjectKey[];
};
/**
*
*/
export type CopyObjectParams = {
/** 源对象 key。 */
sourceKey: StorageObjectKey;
/** 目标对象 key。 */
targetKey: StorageObjectKey;
};
/**
*
*/
export type CopyObjectResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 源对象 key。 */
sourceKey: StorageObjectKey;
/** 目标对象 key。 */
targetKey: StorageObjectKey;
};
/**
*
*/
export type GetObjectMetadataParams = {
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*/
export type GetObjectMetadataResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
/** 元数据。 */
metadata: StorageObjectMetadata;
/** MIME 类型Content-Type。 */
contentType?: string;
/** 内容长度(字节)。 */
contentLength?: number;
/** ETag。 */
etag?: string;
};
/**
*
*
*
* - prefix
* - API
*/
export type DeleteObjectsByPrefixParams = {
/**
* prefix****
*
*
* - bucket
*
*
* - 使`team/{teamId}/``dataset/{datasetId}/`
*/
prefix: string;
};
/**
*
*/
export type ExistsObjectParams = {
/** 对象 key。 */
key: StorageObjectKey;
};
/**
*
*/
export type ExistsObjectResult = {
/** bucket 名称。 */
bucket: StorageBucketName;
/** 对象 key。 */
key: StorageObjectKey;
/** 对象是否存在。 */
exists: boolean;
};

View File

@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "esnext",
"module": "esnext",
"moduleResolution": "bundler",
"allowJs": true,
"skipLibCheck": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noEmit": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"isolatedModules": true,
}
}

View File

@ -0,0 +1,16 @@
import { defineConfig } from 'tsdown';
export default defineConfig({
entry: 'src/index.ts',
format: 'esm',
dts: {
enabled: true,
sourcemap: false
},
outExtensions() {
return {
dts: '.d.ts',
js: '.js'
};
}
});

View File

@ -37,13 +37,19 @@ export const useUploadAvatar = (
}),
file.name
);
const { url, fields } = await api({ filename: file.name });
const formData = new FormData();
Object.entries(fields).forEach(([k, v]) => formData.set(k, v));
formData.set('file', compressed);
const res = await fetch(url, { method: 'POST', body: formData }); // 204
if (res.ok && res.status === 204) {
onSuccess?.(`${imageBaseUrl}${fields.key}`);
const {
url,
fields: { key, ...headers }
} = await api({ filename: file.name });
const res = await fetch(url, {
method: 'PUT',
body: compressed,
headers: {
...headers
}
});
if (res.ok && res.status === 200) {
onSuccess?.(`${imageBaseUrl}${key}`);
}
});
},

File diff suppressed because it is too large Load Diff

View File

@ -34,18 +34,6 @@ AIPROXY_API_TOKEN=aiproxy
# OPENAI_BASE_URL=https://api.openai.com/v1
# CHAT_API_KEY=sk-xxxx
# S3 Config
S3_EXTERNAL_BASE_URL=
S3_ENDPOINT=localhost
S3_PORT=9000
S3_USE_SSL=false
S3_ACCESS_KEY=minioadmin
S3_SECRET_KEY=minioadmin
S3_PUBLIC_BUCKET=fastgpt-public # 插件文件存储公开桶
S3_PRIVATE_BUCKET=fastgpt-private # 插件文件存储公开桶
S3_PATH_STYLE=true # forcePathStyle 默认为 true, 当且仅当设置为 false 时关闭, 其他值都为 true
S3_REGION= # 如果是本地部署的 MinIO等服务就不需要如果是云服务就需要比如 aws 的或者国内 oss 厂商
# Redis URL
REDIS_URL=redis://default:mypassword@127.0.0.1:6379
# mongo 数据库连接参数,本地开发连接远程数据库时,可能需要增加 directConnection=true 参数,才能连接上。
@ -121,3 +109,62 @@ CONFIG_JSON_PATH=
# CHAT_LOG_INTERVAL=10000
# # 日志来源ID前缀
# CHAT_LOG_SOURCE_ID_PREFIX=fastgpt-
# ✅ 对象存储供应商
# - minio: MinIO / 或者其他兼容 S3 协议的自部署对象存储服务
# - aws-s3: AWS S3
# - oss: 阿里云 OSS
# - cos: 腾讯云 COS
STORAGE_VENDOR=minio
# 地区
# - minio: 通常本地部署的对象存储服务的地区设置没什么影响 比如设置为 "us-east-1" 就可以了
# - aws-s3: 根据云服务商提供的设置 比如 "ap-northeast-1"
# - oss: 根据云服务商提供的设置 比如 "oss-cn-hangzhou"
# - cos: 根据云服务商提供的设置 比如 "ap-shanghai"
STORAGE_REGION=us-east-1
# 身份验证凭证
STORAGE_ACCESS_KEY_ID=minioadmin
STORAGE_SECRET_ACCESS_KEY=minioadmin
# 存储桶名称
# - 公开桶
# - 私有桶
STORAGE_PUBLIC_BUCKET=fastgpt-public
STORAGE_PRIVATE_BUCKET=fastgpt-private
# 一个公开的、前端和用户可以直接访问的对象存储连接
# - 比如 MinIO 的反向代理链接或者一个 CDN: https://s3.example.com
STORAGE_EXTERNAL_BASE_URL=
# ️⭕ 兼容 S3 协议的对象存储需要额外填的
# S3 端点连接 URL 为了避免歧义 填写完整的、包含协议与端口的 URL
# - 本地 MinIO: http://127.0.0.1:9000
# - docker-compose 中的 MinIO: http://fastgpt-minio:9000
STORAGE_S3_ENDPOINT=http://127.0.0.1:9000
# 路径风格配置 (virtual-host style | path style)
# - true => http(s)://endpoint/{bucket}/{key}
# - false => http(s)://{bucket}.endpoint/{key}
STORAGE_S3_FORCE_PATH_STYLE=true
# 【可选】最多请求重试次数
STORAGE_S3_MAX_RETRIES=3
# ️⭕ 阿里云 OSS 需要额外填的 参考: https://github.com/ali-sdk/ali-oss?tab=readme-ov-file#ossoptions
# 阿里云连接端点 URL
# - 比如 oss-cn-hangzhou.aliyuncs.com
# - 如果配置了 CName 记得更换为映射的域名 比如 http(s)://example.com
STORAGE_OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com
# 【可选】自定义域名 CNAME 参考: https://help.aliyun.com/zh/oss/developer-reference/initialization-10?spm=a2c4g.11186623.help-menu-31815.d_1_1_10_1.34ec79cfj3YO6w&scm=20140722.H_111256._.OR_help-T_cn~zh-V_1#9635d0c28f3p6
STORAGE_OSS_CNAME=false
# 【可选】是否开启 TLS
# - true
# - false
STORAGE_OSS_SECURE=false
# 【可选】Whether to use internal endpoint (intra-cloud)
STORAGE_OSS_INTERNAL=false
# ️⭕ 腾讯云 COS 需要额外填的 参考: https://cloud.tencent.com/document/product/436/8629#.E9.85.8D.E7.BD.AE.E9.A1.B9
# 【可选】发请求时用的协议,可选项 https:、http: 默认判断当前页面是 http: 时使用 http: 否则使用 https:
# - http:
# - https:
STORAGE_COS_PROTOCOL=http:
# 【可选】是否启用全球加速域名 默认为 false 若改为 true 需要存储桶开启全球加速功能
STORAGE_COS_USE_ACCELERATE=false
# 【可选】CNAME 的自定义域名
STORAGE_COS_CNAME_DOMAIN=
# 【可选】请求时使用 HTTP 代理例如http://127.0.0.1:8080
STORAGE_COS_PROXY=

View File

@ -26,7 +26,7 @@ import MyAvatar from '@fastgpt/web/components/common/Avatar';
import { z } from 'zod';
import { getPresignedChatFileGetUrl, getUploadChatFilePresignedUrl } from '@/web/common/file/api';
import { useContextSelector } from 'use-context-selector';
import { POST } from '@/web/common/api/request';
import { PUT } from '@/web/common/api/request';
import { getErrText } from '@fastgpt/global/common/error/utils';
import { formatFileSize } from '@fastgpt/global/common/file/tools';
import { WorkflowRuntimeContext } from '@/components/core/chat/ChatContainer/context/workflowRuntimeContext';
@ -111,7 +111,10 @@ const FileSelector = ({
try {
// Get Upload Post Presigned URL
const { url, fields } = await getUploadChatFilePresignedUrl({
const {
url,
fields: { key, ...headers }
} = await getUploadChatFilePresignedUrl({
filename: file.rawFile.name,
appId,
chatId,
@ -119,10 +122,10 @@ const FileSelector = ({
});
// Upload File to S3
const formData = new FormData();
Object.entries(fields).forEach(([k, v]) => formData.set(k, v));
formData.set('file', file.rawFile);
await POST(url, formData, {
await PUT(url, file.rawFile, {
headers: {
...headers
},
onUploadProgress: (e) => {
if (!e.total) return;
const percent = Math.round((e.loaded / e.total) * 100);
@ -136,7 +139,7 @@ const FileSelector = ({
timeout: 5 * 60 * 1000 // 5 minutes
});
const previewUrl = await getPresignedChatFileGetUrl({
key: fields.key,
key: key,
appId,
outLinkAuthData
});
@ -145,7 +148,7 @@ const FileSelector = ({
files.forEach((item) => {
if (item.id === file.id) {
item.url = previewUrl;
item.key = fields.key;
item.key = key;
item.process = 100;
}
});

View File

@ -14,7 +14,7 @@ import { type AppFileSelectConfigType } from '@fastgpt/global/core/app/type';
import { useSystemStore } from '@/web/common/system/useSystemStore';
import { type OutLinkChatAuthProps } from '@fastgpt/global/support/permission/chat';
import { getPresignedChatFileGetUrl, getUploadChatFilePresignedUrl } from '@/web/common/file/api';
import { POST } from '@/web/common/api/request';
import { PUT } from '@/web/common/api/request';
import { getUploadFileType } from '@fastgpt/global/core/app/constants';
import { parseS3UploadError } from '@fastgpt/global/common/error/s3';
@ -176,7 +176,11 @@ export const useFileUpload = (props: UseFileUploadOptions) => {
const fileIndex = fileList.findIndex((item) => item.id === file.id)!;
// Get Upload Post Presigned URL
const { url, fields, maxSize } = await getUploadChatFilePresignedUrl({
const {
url,
fields: { key, ...headers },
maxSize
} = await getUploadChatFilePresignedUrl({
filename: copyFile.rawFile.name,
appId,
chatId,
@ -184,10 +188,10 @@ export const useFileUpload = (props: UseFileUploadOptions) => {
});
// Upload File to S3
const formData = new FormData();
Object.entries(fields).forEach(([k, v]) => formData.set(k, v));
formData.set('file', copyFile.rawFile);
await POST(url, formData, {
await PUT(url, copyFile.rawFile, {
headers: {
...headers
},
onUploadProgress: (e) => {
if (!e.total) return;
const percent = Math.round((e.loaded / e.total) * 100);
@ -198,14 +202,14 @@ export const useFileUpload = (props: UseFileUploadOptions) => {
}).catch((error) => Promise.reject(parseS3UploadError({ t, error, maxSize })));
const previewUrl = await getPresignedChatFileGetUrl({
key: fields.key,
key: key,
appId,
outLinkAuthData
});
// Update file url and key
copyFile.url = previewUrl;
copyFile.key = fields.key;
copyFile.key = key;
updateFiles(fileIndex, copyFile);
} catch (error) {
errorFileIndex.push(fileList.findIndex((item) => item.id === file.id)!);

View File

@ -20,7 +20,7 @@ import { useUploadAvatar } from '@fastgpt/web/common/file/hooks/useUploadAvatar'
import { useRequest2 } from '@fastgpt/web/hooks/useRequest';
import { postCreateDatasetWithFiles, getDatasetById } from '@/web/core/dataset/api';
import { getUploadAvatarPresignedUrl, getUploadTempFilePresignedUrl } from '@/web/common/file/api';
import { POST } from '@/web/common/api/request';
import { PUT } from '@/web/common/api/request';
import { useSystemStore } from '@/web/common/system/useSystemStore';
import { getWebDefaultEmbeddingModel, getWebDefaultLLMModel } from '@/web/common/system/utils';
import { getErrText } from '@fastgpt/global/common/error/utils';
@ -82,15 +82,18 @@ const QuickCreateDatasetModal = ({
await Promise.all(
files.map(async ({ fileId, file }) => {
try {
const { url, fields, maxSize } = await getUploadTempFilePresignedUrl({
const {
url,
fields: { key, ...headers },
maxSize
} = await getUploadTempFilePresignedUrl({
filename: file.name
});
const formData = new FormData();
Object.entries(fields).forEach(([k, v]) => formData.set(k, v));
formData.set('file', file);
await POST(url, formData, {
await PUT(url, file, {
headers: {
...headers
},
onUploadProgress: (e) => {
if (!e.total) return;
const percent = Math.round((e.loaded / e.total) * 100);
@ -115,7 +118,7 @@ const QuickCreateDatasetModal = ({
item.id === fileId
? {
...item,
dbFileId: fields.key,
dbFileId: key,
isUploading: false,
uploadedFileRate: 100
}

View File

@ -55,21 +55,17 @@ const ImportPluginModal = ({
)
);
const presignedData = await getPkgPluginUploadURL({ filename: file.name });
const formData = new FormData();
Object.entries(presignedData.formData).forEach(([key, value]) => {
formData.append(key, value);
const { formData, objectName, postURL } = await getPkgPluginUploadURL({
filename: file.name
});
formData.append('file', file.file);
await postS3UploadFile(presignedData.postURL, formData);
await postS3UploadFile(postURL, file.file, { ...formData });
setUploadedFiles((prev) =>
prev.map((f) => (f.name === file.name ? { ...f, status: 'parsing' } : f))
);
const parseResult = await parseUploadedPkgPlugin({ objectName: presignedData.objectName });
const parseResult = await parseUploadedPkgPlugin({ objectName });
const parentId = parseResult.find((item) => !item.parentId)?.toolId;
if (!parentId) {

View File

@ -14,7 +14,7 @@ import { formatFileSize } from '@fastgpt/global/common/file/tools';
import { getFileIcon } from '@fastgpt/global/common/file/icon';
import { DatasetPageContext } from '@/web/core/dataset/context/datasetPageContext';
import { getUploadDatasetFilePresignedUrl } from '@/web/common/file/api';
import { POST } from '@/web/common/api/request';
import { PUT } from '@/web/common/api/request';
import { parseS3UploadError } from '@fastgpt/global/common/error/s3';
const DataProcess = dynamic(() => import('../commonProgress/DataProcess'));
@ -68,16 +68,20 @@ const SelectFile = React.memo(function SelectFile() {
await Promise.all(
files.map(async ({ fileId, file }) => {
try {
const { url, fields, maxSize } = await getUploadDatasetFilePresignedUrl({
const {
url,
fields: { key, ...headers },
maxSize
} = await getUploadDatasetFilePresignedUrl({
filename: file.name,
datasetId
});
// Upload File to S3
const formData = new FormData();
Object.entries(fields).forEach(([k, v]) => formData.set(k, v));
formData.set('file', file);
await POST(url, formData, {
await PUT(url, file, {
headers: {
...headers
},
onUploadProgress: (e) => {
if (!e.total) return;
const percent = Math.round((e.loaded / e.total) * 100);
@ -102,7 +106,7 @@ const SelectFile = React.memo(function SelectFile() {
item.id === fileId
? {
...item,
dbFileId: fields.key,
dbFileId: key,
isUploading: false,
uploadedFileRate: 100
}

View File

@ -38,7 +38,7 @@ async function handler(
const bucket = new S3PrivateBucket();
const { fileKey } = getFileS3Key.temp({ teamId, filename });
return await bucket.createPostPresignedUrl({ rawKey: fileKey, filename }, { expiredHours: 1 });
return await bucket.createPresignedPutUrl({ rawKey: fileKey, filename }, { expiredHours: 1 });
}
export default NextAPI(handler);

View File

@ -34,7 +34,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
getS3DatasetSource().getFileStream(fileId)
]);
if (!file) {
if (!file || !fileStream) {
return Promise.reject(CommonErrEnum.fileNotFound);
}
@ -49,7 +49,9 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
'Content-Disposition',
`${disposition}; filename="${encodeURIComponent(filename)}"`
);
res.setHeader('Content-Length', file.contentLength);
if (file.contentLength) {
res.setHeader('Content-Length', file.contentLength);
}
stream.pipe(res);

View File

@ -15,7 +15,9 @@ async function handler(req: ApiRequestProps<PresignChatFileGetUrlParams>): Promi
...outLinkAuthData
});
return await getS3ChatSource().createGetChatFileURL({ key, external: true });
const { getUrl: url } = await getS3ChatSource().createGetChatFileURL({ key, external: true });
return url;
}
export default NextAPI(handler);

View File

@ -79,11 +79,13 @@ async function handler(
collection.fileId &&
isS3ObjectKey(collection.fileId, 'dataset')
) {
return getS3DatasetSource().createGetDatasetFileURL({
key: collection.fileId,
expiredHours: 1,
external: true
});
return (
await getS3DatasetSource().createGetDatasetFileURL({
key: collection.fileId,
expiredHours: 1,
external: true
})
).getUrl;
}
if (collection.type === DatasetCollectionTypeEnum.link && collection.rawLink) {
return collection.rawLink;

View File

@ -83,7 +83,7 @@ async function handler(
const s3ImageIds = imageIds.filter((id) => isS3ObjectKey(id, 'dataset'));
for (const id of s3ImageIds) {
const metadata = await getS3DatasetSource().getFileMetadata(id);
if (metadata) {
if (metadata?.contentLength) {
imageSizeMap.set(id, metadata.contentLength);
}
}

View File

@ -32,8 +32,17 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
})()
);
if (!stream) {
return jsonRes(res, {
code: 404,
error: 'File not found'
});
}
if (metadata) {
res.setHeader('Content-Type', metadata.contentType);
}
if (metadata?.contentLength) {
res.setHeader('Content-Length', metadata.contentLength);
}
res.setHeader('Cache-Control', 'public, max-age=31536000');

View File

@ -179,13 +179,15 @@ function request(
{ cancelToken, maxQuantity, withCredentials, ...config }: ConfigType,
method: Method
): any {
/* 去空 */
for (const key in data) {
const val = data[key];
if (data[key] === undefined) {
delete data[key];
} else if (val instanceof Date) {
data[key] = dayjs(val).format();
if (!(data instanceof Blob)) {
/* 去空 */
for (const key in data) {
const val = data[key];
if (data[key] === undefined) {
delete data[key];
} else if (val instanceof Date) {
data[key] = dayjs(val).format();
}
}
}

View File

@ -1,16 +1,18 @@
import { POST } from '@/web/common/api/request';
import { POST, PUT } from '@/web/common/api/request';
import type { OutLinkChatAuthProps } from '@fastgpt/global/support/permission/chat';
import type { CreatePostPresignedUrlResult } from '@fastgpt/service/common/s3/type';
import { type AxiosProgressEvent } from 'axios';
export const postS3UploadFile = (
postURL: string,
form: FormData,
file: File,
headers?: Record<string, string>,
onUploadProgress?: (progressEvent: AxiosProgressEvent) => void
) =>
POST(postURL, form, {
PUT(postURL, file, {
timeout: 600000,
onUploadProgress
onUploadProgress,
...(headers ? { headers } : {})
});
export const getUploadAvatarPresignedUrl = (params: {

View File

@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2022",
"target": "esnext",
"lib": ["dom", "dom.iterable", "esnext"],
"allowJs": true,
"skipLibCheck": true,
@ -9,7 +9,7 @@
"noEmit": true,
"esModuleInterop": true,
"module": "esnext",
"moduleResolution": "node",
"moduleResolution": "bundler",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",