FastGPT/packages/service/core/dataset/read.ts
Archer 58000324e2
feature: V4.14.3 (#5970)
* feat(marketplace): update plugin/ download count statistic (#5957)

* feat: download count

* feat: update ui

* fix: ui

* chore: update sdk verison

* chore: update .env.template

* chore: adjust

* chore: remove console.log

* chore: adjust

* Update projects/marketplace/src/pages/index.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/marketplace/src/pages/index.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/app/src/pages/config/tool/marketplace.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: update refresh; feat: marketplace download count per hour

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* download

* marketplace code

* fix: ui (#5963)

* feat: support dataset and files as global variables (#5961)

* json & dataset

* file

* fix file var

* fix

* fix init

* remove

* perf: file vars

* fix: file uploading errors (#5969)

* fix: file uploading errors

* fix build

* perf: fileselector ux

* feat: integrate S3 for dataset with compatibility (#5941)

* fix: text split

* remove test

* feat: integrate S3 for dataset with compatibility

* fix: delay s3 files delete timing

* fix: remove imageKeys

* fix: remove parsed images' TTL

* fix: improve codes by pr comments

---------

Co-authored-by: archer <545436317@qq.com>

* remove log

* perf: request limit

* chore: s3 migration script (#5971)

* test

* perf: s3 code

* fix: migration script (#5972)

* perf: s3 move object

* wip: fix s3 bugs (#5976)

* fix: incorrect replace origin logic (#5978)

* fix: add downloadURL (#5980)

* perf: file variable ttl & quick create dataset with temp s3 bucket (#5973)

* perf: file variable ttl & quick create dataset with temp s3 bucket

* fix

* plugin & form input variables (#5979)

* plugin & form input variables

* fix

* docs: 4143.mdx (#5981)

* doc: update 4143.mdx (#5982)

* fix form input file ttl (#5983)

* trans file type (#5986)

* trans file type

* fix

* fix: S3 script early return (#5985)

* fix: S3 script typeof

* fix: truncate large filename to fit S3 name

* perf(permission): add a schema verification for resource permission, tmbId, groupId, orgId should be set at least one of them (#5987)

* fix: version & typo (#5988)

* fix-v4.14.3 (#5991)

* fix: empty alt make replace JWT failed & incorrect image dataset preview url (#5989)

* fix: empty alt make replace JWT failed & incorrect image dataset preview url

* fix: s3 files recovery script

* fix: incorrect chat external url parsing (#5993)

---------

Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: heheer <heheer@sealos.io>
Co-authored-by: Roy <whoeverimf5@gmail.com>
2025-11-26 20:47:28 +08:00

362 lines
9.0 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
ChunkTriggerConfigTypeEnum,
DatasetSourceReadTypeEnum
} from '@fastgpt/global/core/dataset/constants';
import { urlsFetch } from '../../common/string/cheerio';
import { type TextSplitProps } from '@fastgpt/global/common/string/textSplitter';
import axios from 'axios';
import { readS3FileContentByBuffer } from '../../common/file/read/utils';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import { getApiDatasetRequest } from './apiDataset';
import Papa from 'papaparse';
import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type';
import { text2Chunks } from '../../worker/function';
import { addLog } from '../../common/system/log';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { getFileMaxSize } from '../../common/file/utils';
import { UserError } from '@fastgpt/global/common/error/utils';
import { getS3DatasetSource, S3DatasetSource } from '../../common/s3/sources/dataset';
import { getFileS3Key, isS3ObjectKey } from '../../common/s3/utils';
export const readFileRawTextByUrl = async ({
teamId,
tmbId,
url,
customPdfParse,
getFormatText,
relatedId,
datasetId,
maxFileSize = getFileMaxSize()
}: {
teamId: string;
tmbId: string;
url: string;
customPdfParse?: boolean;
getFormatText?: boolean;
relatedId: string; // externalFileId / apiFileId
datasetId: string;
maxFileSize?: number;
}) => {
const extension = parseFileExtensionFromUrl(url);
// Check file size
try {
const headResponse = await axios.head(url, { timeout: 10000 });
const contentLength = parseInt(headResponse.headers['content-length'] || '0');
if (contentLength > 0 && contentLength > maxFileSize) {
return Promise.reject(
`File too large. Size: ${Math.round(contentLength / 1024 / 1024)}MB, Maximum allowed: ${Math.round(maxFileSize / 1024 / 1024)}MB`
);
}
} catch (error) {
addLog.warn('Check file HEAD request failed');
}
// Use stream response type, avoid double memory usage
const response = await axios({
method: 'get',
url: url,
responseType: 'stream',
maxContentLength: maxFileSize,
timeout: 30000
});
// 优化:直接从 stream 转换为 buffer避免 arraybuffer 中间步骤
const chunks: Buffer[] = [];
let totalLength = 0;
return new Promise<{ rawText: string }>((resolve, reject) => {
let isAborted = false;
const cleanup = () => {
if (!isAborted) {
isAborted = true;
chunks.length = 0; // 清理内存
response.data.destroy();
}
};
// Stream timeout
const timeoutId = setTimeout(() => {
cleanup();
reject('File download timeout after 30 seconds');
}, 600000);
response.data.on('data', (chunk: Buffer) => {
if (isAborted) return;
totalLength += chunk.length;
if (totalLength > maxFileSize) {
clearTimeout(timeoutId);
cleanup();
return reject(
`File too large. Maximum size allowed is ${Math.round(maxFileSize / 1024 / 1024)}MB.`
);
}
chunks.push(chunk);
});
response.data.on('end', async () => {
if (isAborted) return;
clearTimeout(timeoutId);
try {
// 合并所有 chunks 为单个 buffer
const buffer = Buffer.concat(chunks);
// 立即清理 chunks 数组释放内存
chunks.length = 0;
const { rawText } = await retryFn(() => {
const { fileParsedPrefix } = getFileS3Key.dataset({
datasetId,
filename: 'file'
});
return readS3FileContentByBuffer({
customPdfParse,
getFormatText,
extension,
teamId,
tmbId,
buffer,
encoding: 'utf-8',
imageKeyOptions: {
// TODO: 链接解析出来的图片不过期,删除知识库时候也需要一起删
prefix: fileParsedPrefix
}
});
});
resolve({ rawText });
} catch (error) {
cleanup();
reject(error);
}
});
response.data.on('error', (error: Error) => {
clearTimeout(timeoutId);
cleanup();
reject(error);
});
response.data.on('close', () => {
clearTimeout(timeoutId);
cleanup();
});
});
};
/*
fileId - local file, read from mongo
link - request
externalFile/apiFile = request read
*/
export const readDatasetSourceRawText = async ({
teamId,
tmbId,
type,
sourceId,
selector,
externalFileId,
apiDatasetServer,
customPdfParse,
getFormatText,
usageId,
datasetId
}: {
teamId: string;
tmbId: string;
type: DatasetSourceReadTypeEnum;
sourceId: string;
customPdfParse?: boolean;
getFormatText?: boolean;
selector?: string; // link selector
externalFileId?: string; // external file dataset
apiDatasetServer?: ApiDatasetServerType; // api dataset
usageId?: string;
datasetId: string; // For S3 image upload
}): Promise<{
title?: string;
rawText: string;
}> => {
if (type === DatasetSourceReadTypeEnum.fileLocal) {
if (!datasetId || !isS3ObjectKey(sourceId, 'dataset')) {
return Promise.reject('datasetId is required for S3 files');
}
const { filename, rawText } = await getS3DatasetSource().getDatasetFileRawText({
teamId,
tmbId,
fileId: sourceId,
getFormatText,
customPdfParse,
usageId,
datasetId
});
return {
title: filename,
rawText
};
} else if (type === DatasetSourceReadTypeEnum.link) {
const result = await urlsFetch({
urlList: [sourceId],
selector
});
const { title = sourceId, content = '' } = result[0];
if (!content || content === 'Cannot fetch internal url') {
return Promise.reject(content || 'Can not fetch content from link');
}
return {
title,
rawText: content
};
} else if (type === DatasetSourceReadTypeEnum.externalFile) {
if (!externalFileId) return Promise.reject(new UserError('FileId not found'));
const { rawText } = await readFileRawTextByUrl({
teamId,
tmbId,
url: sourceId,
relatedId: externalFileId,
datasetId,
customPdfParse
});
return {
rawText
};
} else if (type === DatasetSourceReadTypeEnum.apiFile) {
const { title, rawText } = await readApiServerFileContent({
apiDatasetServer,
apiFileId: sourceId,
teamId,
tmbId,
customPdfParse,
datasetId
});
return {
title,
rawText
};
}
return {
title: '',
rawText: ''
};
};
export const readApiServerFileContent = async ({
apiDatasetServer,
apiFileId,
teamId,
tmbId,
customPdfParse,
datasetId
}: {
apiDatasetServer?: ApiDatasetServerType;
apiFileId: string;
teamId: string;
tmbId: string;
customPdfParse?: boolean;
datasetId: string;
}): Promise<{
title?: string;
rawText: string;
}> => {
return (await getApiDatasetRequest(apiDatasetServer)).getFileContent({
teamId,
tmbId,
apiFileId,
customPdfParse,
datasetId
});
};
export const rawText2Chunks = async ({
rawText = '',
chunkTriggerType = ChunkTriggerConfigTypeEnum.minSize,
chunkTriggerMinSize = 1000,
backupParse,
chunkSize = 512,
imageIdList,
...splitProps
}: {
rawText: string;
imageIdList?: string[];
chunkTriggerType?: ChunkTriggerConfigTypeEnum;
chunkTriggerMinSize?: number; // maxSize from agent model, not store
backupParse?: boolean;
tableParse?: boolean;
} & TextSplitProps): Promise<
{
q: string;
a: string;
indexes?: string[];
imageIdList?: string[];
}[]
> => {
const parseDatasetBackup2Chunks = (rawText: string) => {
const csvArr = Papa.parse(rawText).data as string[][];
const chunks = csvArr
.slice(1)
.map((item) => ({
q: item[0] || '',
a: item[1] || '',
indexes: item.slice(2).filter((item) => item.trim()),
imageIdList
}))
.filter((item) => item.q || item.a);
return {
chunks
};
};
if (backupParse) {
return parseDatasetBackup2Chunks(rawText).chunks;
}
// Chunk condition
// 1. 选择最大值条件,只有超过了最大值(默认为模型的最大值*0.7),才会触发分块
if (chunkTriggerType === ChunkTriggerConfigTypeEnum.maxSize) {
const textLength = rawText.trim().length;
const maxSize = splitProps.maxSize ? splitProps.maxSize * 0.7 : 16000;
if (textLength < maxSize) {
return [
{
q: rawText,
a: '',
imageIdList
}
];
}
}
// 2. 选择最小值条件,只有超过最小值(手动决定)才会触发分块
if (chunkTriggerType !== ChunkTriggerConfigTypeEnum.forceChunk) {
const textLength = rawText.trim().length;
if (textLength < chunkTriggerMinSize) {
return [{ q: rawText, a: '', imageIdList }];
}
}
const { chunks } = await text2Chunks({
text: rawText,
chunkSize,
...splitProps
});
return chunks.map((item) => ({
q: item,
a: '',
indexes: [],
imageIdList
}));
};