FastGPT/packages/service/worker/utils.ts
Archer 44e9299d5e
Some checks are pending
Document deploy / sync-images (push) Waiting to run
Document deploy / generate-timestamp (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.cn suffix:cn]) (push) Blocked by required conditions
Document deploy / build-images (map[domain:https://fastgpt.io suffix:io]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.cn kube_config:KUBE_CONFIG_CN suffix:cn]) (push) Blocked by required conditions
Document deploy / update-images (map[deployment:fastgpt-docs domain:https://fastgpt.io kube_config:KUBE_CONFIG_IO suffix:io]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / get-vars (push) Waiting to run
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:amd64 runs-on:ubuntu-24.04]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / build-fastgpt-images (map[arch:arm64 runs-on:ubuntu-24.04-arm]) (push) Blocked by required conditions
Build FastGPT images in Personal warehouse / release-fastgpt-images (push) Blocked by required conditions
V4.13.2 features (#5792)
* add manual create http toolset (#5743)

* add manual create http toolset

* optimize code

* optimize

* fix

* fix

* rename filename

* feat: integrate ts-rest (#5741)

* feat: integrate ts-rest

* chore: classify core contract and pro contract

* chore: update lockfile

* chore: tweak dir structure

* chore: tweak dir structure

* update tsrest code (#5755)

* doc

* update tsrest code

* fix http toolset (#5753)

* fix http toolset

* fix

* perf: http toolset

* fix: toolresponse result (#5760)

* doc

* fix: toolresponse result

* fix: mongo watch

* remove log

* feat: integrated to minio (#5748)

* feat: migrate to minio

* feat: migrate apps' and dataset's avatar to minio

* feat: migrate more avatars to minio

* fix: lock file

* feat: migrate copyright settings' logo to minio

* feat: integrate minio

* chore: improve code

* chore: rename variables

* refactor: s3 class

* fix: s3 and mongo operations

* chore: add session for avatar source

* fix: init s3 buckets

* fix: bugbot issues

* expired time code

* perf: avatar code

* union type

* export favouriteContract

* empty bucket check

---------

Co-authored-by: archer <545436317@qq.com>

* refactor: zod schema to generate OpenAPI instead (#5771)

* doc

* fix: text split code (#5773)

* fix: toolresponse result

* remove log

* stream remove

* fix: text split code

* fix: workflow (#5779)

* fix: toolresponse result

* remove log

* fix: value check

* fix: workflow

* openapi doc

* perf: bucket delete cron

* doc

* feat: apikey health

* feat: export variables

* api code move

* perf: workflow performance (#5783)

* perf: reactflow context

* perf: workflow context split

* perf: nodeList computed map

* perf: nodes dependen

* perf: workflow performance

* workflow performance

* removel og

* lock

* version

* loop drag

* reactflow size

* reactflow size

* fix: s3init (#5784)

* doc

* fix: s3init

* perf: dynamic import

* remove moongose dep

* worker build

* worker code

* perf: worker build

* fix: error throw

* doc

* doc

* fix: build

* fix: dockerfile

* nextjs config

* fix: worker

* fix: build (#5791)

* fix: build

* vector cache code

* fix: app info modal avatar upload method replace (#5787)

* fix: app info modal avatar upload method replace

* chore: replace all useSelectFile with useUploadAvatar

* remove invalid code

* add size

* Update projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/nodes/render/RenderInput/templates/CommonInputForm.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update projects/app/src/pageComponents/app/detail/WorkflowComponents/context/workflowInitContext.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: heheer <heheer@sealos.io>
Co-authored-by: 伍闲犬 <whoeverimf5@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-20 19:08:21 +08:00

226 lines
6.2 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Worker } from 'worker_threads';
import path from 'path';
import { addLog } from '../common/system/log';
export enum WorkerNameEnum {
readFile = 'readFile',
htmlStr2Md = 'htmlStr2Md',
countGptMessagesTokens = 'countGptMessagesTokens',
systemPluginRun = 'systemPluginRun',
text2Chunks = 'text2Chunks'
}
export const getSafeEnv = () => {
return {
LOG_LEVEL: process.env.LOG_LEVEL,
STORE_LOG_LEVEL: process.env.STORE_LOG_LEVEL,
NODE_ENV: process.env.NODE_ENV,
HTTP_PROXY: process.env.HTTP_PROXY,
HTTPS_PROXY: process.env.HTTPS_PROXY,
NO_PROXY: process.env.NO_PROXY
};
};
export const getWorker = (name: `${WorkerNameEnum}`) => {
const workerPath = path.join(process.cwd(), 'worker', `${name}.js`);
return new Worker(workerPath, {
env: getSafeEnv()
});
};
export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string, any>) => {
return new Promise<T>((resolve, reject) => {
const start = Date.now();
const worker = getWorker(name);
worker.postMessage(params);
worker.on('message', (msg: { type: 'success' | 'error'; data: any }) => {
if (msg.type === 'error') return reject(msg.data);
resolve(msg.data);
const time = Date.now() - start;
if (time > 1000) {
addLog.info(`Worker ${name} run time: ${time}ms`);
}
});
worker.on('error', (err) => {
reject(err);
worker.terminate();
});
worker.on('messageerror', (err) => {
reject(err);
worker.terminate();
});
});
};
type WorkerRunTaskType<T> = { data: T; resolve: (e: any) => void; reject: (e: any) => void };
type WorkerQueueItem = {
id: string;
worker: Worker;
status: 'running' | 'idle';
taskTime: number;
timeoutId?: NodeJS.Timeout;
resolve: (e: any) => void;
reject: (e: any) => void;
};
type WorkerResponse<T = any> = {
id: string;
type: 'success' | 'error';
data: T;
};
/*
多线程任务管理
* 全局只需要创建一个示例
* 可以设置最大常驻线程(不会被销毁),线程满了后,后续任务会等待执行。
* 每次执行,会把数据丢到一个空闲线程里运行。主线程需要监听子线程返回的数据,并执行对于的 callback主要是通过 workerId 进行标记。
* 务必保证,每个线程只会同时运行 1 个任务,否则 callback 会对应不上。
*/
export class WorkerPool<Props = Record<string, any>, Response = any> {
name: WorkerNameEnum;
maxReservedThreads: number;
workerQueue: WorkerQueueItem[] = [];
waitQueue: WorkerRunTaskType<Props>[] = [];
constructor({ name, maxReservedThreads }: { name: WorkerNameEnum; maxReservedThreads: number }) {
this.name = name;
this.maxReservedThreads = maxReservedThreads;
}
private runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
// Get idle worker or create a new worker
const runningWorker = (() => {
// @ts-ignore
if (data.workerId) {
// @ts-ignore
const worker = this.workerQueue.find((item) => item.id === data.workerId);
if (worker) return worker;
}
const worker = this.workerQueue.find((item) => item.status === 'idle');
if (worker) return worker;
if (this.workerQueue.length < this.maxReservedThreads) {
return this.createWorker();
}
})();
if (runningWorker) {
// Update memory data to latest task
runningWorker.status = 'running';
runningWorker.taskTime = Date.now();
runningWorker.resolve = resolve;
runningWorker.reject = reject;
runningWorker.timeoutId = setTimeout(() => {
reject('Worker timeout');
}, 30000);
runningWorker.worker.postMessage({
id: runningWorker.id,
...data
});
} else {
// Not enough worker, push to wait queue
this.waitQueue.push({ data, resolve, reject });
}
}
run(data: Props) {
// watch memory
// addLog.debug(`${this.name} worker queueLength: ${this.workerQueue.length}`);
return new Promise<Response>((resolve, reject) => {
/*
Whether the task is executed immediately or delayed, the promise callback will dispatch after task complete.
*/
this.runTask({
data,
resolve,
reject
});
}).finally(() => {
// Run wait queue
const waitTask = this.waitQueue.shift();
if (waitTask) {
this.runTask(waitTask);
}
});
}
createWorker() {
// Create a new worker and push it queue.
const workerId = `${Date.now()}${Math.random()}`;
const worker = getWorker(this.name);
const item: WorkerQueueItem = {
id: workerId,
worker,
status: 'running',
taskTime: Date.now(),
resolve: () => {},
reject: () => {}
};
this.workerQueue.push(item);
// watch response
worker.on('message', ({ id, type, data }: WorkerResponse<Response>) => {
if (type === 'success') {
item.resolve(data);
} else if (type === 'error') {
item.reject(data);
}
// Clear timeout timer and update worker status
clearTimeout(item.timeoutId);
item.status = 'idle';
});
// Worker error, terminate and delete it.Un catch error)
worker.on('error', (err) => {
console.log(err);
addLog.error('Worker error', err);
this.deleteWorker(workerId);
});
worker.on('messageerror', (err) => {
console.log(err);
addLog.error('Worker messageerror', err);
this.deleteWorker(workerId);
});
return item;
}
private deleteWorker(workerId: string) {
const item = this.workerQueue.find((item) => item.id === workerId);
if (item) {
item.reject?.('error');
clearTimeout(item.timeoutId);
item.worker.removeAllListeners();
item.worker.terminate();
}
this.workerQueue = this.workerQueue.filter((item) => item.id !== workerId);
}
}
export const getWorkerController = <Props, Response>(props: {
name: WorkerNameEnum;
maxReservedThreads: number;
}) => {
if (!global.workerPoll) {
// @ts-ignore
global.workerPoll = {};
}
const name = props.name;
if (global.workerPoll[name]) return global.workerPoll[name] as WorkerPool<Props, Response>;
global.workerPoll[name] = new WorkerPool(props);
return global.workerPoll[name] as WorkerPool<Props, Response>;
};