Feat: task pool and create upload session

This commit is contained in:
HFO4 2022-02-27 14:33:48 +08:00
parent 296ad26042
commit 01cdc5bf0e
6 changed files with 122 additions and 5 deletions

View File

@ -34,6 +34,7 @@ export default function Uploader() {
const uploadManager = useMemo(() => {
return new UploadManager({
logLevel: "INFO",
concurrentLimit: 5,
});
}, []);

View File

@ -4,15 +4,18 @@ import { UnknownPolicyError, UploaderError, UploaderErrorName } from "./errors";
import Base from "./uploader/base";
import Folder from "./uploader/folder";
import Local from "./uploader/local";
import { Pool } from "./utils/pool";
export interface Option {
logLevel: LogLevel;
onFileAdded: (task: Task, error?: UploaderError) => void;
concurrentLimit: number;
}
export default class UploadManager {
public logger: Logger;
public pool: Pool;
private static id = 0;
private policy?: Policy;
private input: HTMLInputElement;
@ -23,6 +26,8 @@ export default class UploadManager {
this.logger = new Logger(o.logLevel, "MANAGER");
this.logger.info(`Initialized with log level: ${o.logLevel}`);
this.pool = new Pool(o.concurrentLimit);
const input = document.createElement("input");
input.type = "file";
input.id = `upload-input-${this.id}`;

View File

@ -26,3 +26,20 @@ export interface Task {
}
type Nullable<T> = T | null;
export interface Response<T> {
code: number;
data: T;
msg: string;
}
interface UploadSessionRequest {
path: string;
size: number;
name: string;
policy_id: number;
}
interface UploadCredential {
sessionID: string;
}

View File

@ -1,8 +1,10 @@
// 所有 Uploader 的基类
import { Task } from "../types";
import { Task, UploadCredential } from "../types";
import UploadManager from "../index";
import Logger from "../logger";
import { validate } from "../utils/validator";
import { CancelToken, requestAPI } from "../utils/request";
import { CancelTokenSource } from "axios";
export enum Status {
added,
@ -32,6 +34,9 @@ export default abstract class Base {
protected subscriber: UploadHandlers;
// 用于取消请求
private cancelToken: CancelTokenSource = CancelToken.source();
constructor(public task: Task, protected manager: UploadManager) {
this.logger = new Logger(
this.manager.logger.level,
@ -54,20 +59,49 @@ export default abstract class Base {
};
public start = async () => {
this.logger.info("Activate uploading task.");
this.logger.info("Activate uploading task");
try {
validate(this.task.file, this.task.policy);
} catch (e) {
this.logger.info("File validate failed with error:", e);
this.setError(e);
return;
}
this.status = Status.preparing;
this.subscriber.onTransition(this.status);
this.manager.pool.enqueue(this).catch((e) => {
this.logger.info("Upload task failed with error:", e);
this.setError(e);
});
this.logger.info("Enqueued in manager pool");
};
public upload = async () => {
this.logger.info("Start upload task");
this.transit(Status.preparing);
};
public cancel = async () => {
this.cancelToken.cancel();
// TODO: delete upload session
};
protected setError(e: Error) {
this.status = Status.error;
this.subscriber.onError(e);
}
protected transit(status: Status) {
this.status = status;
this.subscriber.onTransition(status);
}
private async createUploadSession() {
const res = await requestAPI<UploadCredential>(
`/api/v3/file/upload/session`,
{
method: "put",
cancelToken: this.cancelToken.token,
}
);
}
}

View File

@ -0,0 +1,46 @@
import Base from "../uploader/base";
export interface QueueContent {
uploader: Base;
resolve: () => void;
reject: (err?: any) => void;
}
export class Pool {
queue: Array<QueueContent> = [];
processing: Array<QueueContent> = [];
constructor(private limit: number) {}
enqueue(uploader: Base) {
return new Promise<void>((resolve, reject) => {
this.queue.push({
uploader,
resolve,
reject,
});
this.check();
});
}
run(item: QueueContent) {
this.queue = this.queue.filter((v) => v !== item);
this.processing.push(item);
item.uploader.upload().then(
() => {
this.processing = this.processing.filter((v) => v !== item);
item.resolve();
this.check();
},
(err) => item.reject(err)
);
}
check() {
const processingNum = this.processing.length;
const availableNum = this.limit - processingNum;
this.queue.slice(0, availableNum).forEach((item) => {
this.run(item);
});
}
}

View File

@ -0,0 +1,14 @@
import axios, { AxiosRequestConfig } from "axios";
import { Response } from "../types";
export const { CancelToken } = axios;
export { CancelToken as CancelTokenType, CancelTokenSource } from "axios";
const defaultConfig = {
baseURL: "/api/v3",
withCredentials: true,
};
export function requestAPI<T = any>(url: string, config?: AxiosRequestConfig) {
return axios.request<Response<T>>({ ...defaultConfig, ...config, url });
}