mirror of
https://github.com/cloudreve/frontend.git
synced 2025-12-25 19:52:48 +00:00
feat(uploader): concurrent chunk uploads
This commit is contained in:
parent
7fdd0efd0b
commit
a095f8c612
|
|
@ -817,6 +817,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Für Upyun-Speicher wird der <0>Bildverarbeitungsservice</0> verwendet, um Miniaturbilder zu generieren.",
|
||||
"preallocate": "Festplattenspeicher vorab zuweisen",
|
||||
"preallocateDes": "Wenn aktiviert, wird der Upload-Anfrage des Benutzers Festplattenspeicher auf dem Speicherknoten vorab zugewiesen, nur wirksam unter Linux oder Darwin.",
|
||||
"chunkConcurrency": "Parallele Chunk-Uploads",
|
||||
"chunkConcurrencyDes": "Legt die Anzahl der gleichzeitigen Chunk-Uploads beim direkten Web-Upload fest.",
|
||||
"sourceWebEdit": "Web-Online-Bearbeitung",
|
||||
"uploadRelay": "Upload-Relay",
|
||||
"uploadRelayDes": "Wenn aktiviert, werden die Upload-Anfragen der Benutzer über Cloudreve an den Speicherknoten weitergeleitet. Da keine Chunk-Uploads durchgeführt werden können, passen Sie bitte das maximale Upload-Größenlimit des Webservers entsprechend an.",
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "For Upyun storage, <0>image processing</0> service will be used to generate thumbnails.",
|
||||
"preallocate": "Pre-allocate disk space",
|
||||
"preallocateDes": "When enabled, the user's upload request will be pre-allocated disk space on the storage node, only effective on Linux or Darwin.",
|
||||
"chunkConcurrency": "Concurrent chunk uploads",
|
||||
"chunkConcurrencyDes": "Set the number of concurrent chunk uploads when using direct web upload.",
|
||||
"sourceWebEdit": "Web online editing",
|
||||
"uploadRelay": "Upload relay",
|
||||
"uploadRelayDes": "If enabled, users' upload requests will be relayed to the storage node via Cloudreve, due to the inability to perform chunked uploads, please adjust the maximum upload size limit of the web server accordingly.",
|
||||
|
|
|
|||
|
|
@ -817,6 +817,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Para almacenamiento Upyun, el servicio de <0>procesamiento de imágenes</0> se usará para generar miniaturas.",
|
||||
"preallocate": "Pre-asignar espacio en disco",
|
||||
"preallocateDes": "Cuando esté habilitado, la solicitud de subida del usuario pre-asignará espacio en disco en el nodo de almacenamiento, solo efectivo en Linux o Darwin.",
|
||||
"chunkConcurrency": "Subidas de fragmentos concurrentes",
|
||||
"chunkConcurrencyDes": "Establece el número de subidas de fragmentos concurrentes al usar subida directa web.",
|
||||
"sourceWebEdit": "Edición en línea web",
|
||||
"uploadRelay": "Relé de subida",
|
||||
"uploadRelayDes": "Si está habilitado, las solicitudes de subida de los usuarios serán retransmitidas al nodo de almacenamiento vía Cloudreve, debido a la incapacidad de realizar subidas fragmentadas, ajusta el límite de tamaño máximo de subida del servidor web en consecuencia.",
|
||||
|
|
|
|||
|
|
@ -817,6 +817,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Pour le stockage Upyun, le service de <0>traitement d'images</0> sera utilisé pour générer des miniatures.",
|
||||
"preallocate": "Pré-allouer l'espace disque",
|
||||
"preallocateDes": "Lorsqu'activé, la demande d'upload de l'utilisateur pré-allouera l'espace disque sur le nœud de stockage, efficace seulement sur Linux ou Darwin.",
|
||||
"chunkConcurrency": "Uploads de chunks concurrents",
|
||||
"chunkConcurrencyDes": "Définit le nombre d'uploads de chunks concurrents lors de l'utilisation de l'upload direct web.",
|
||||
"sourceWebEdit": "Édition en ligne Web",
|
||||
"uploadRelay": "Relais d'upload",
|
||||
"uploadRelayDes": "Si activé, les demandes d'upload des utilisateurs seront relayées vers le nœud de stockage via Cloudreve, en raison de l'incapacité à effectuer des uploads par chunks, veuillez ajuster la limite de taille d'upload maximale du serveur web en conséquence.",
|
||||
|
|
|
|||
|
|
@ -817,6 +817,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Per l'archiviazione Upyun, verrà utilizzato il servizio <0>elaborazione immagini</0> per generare miniature.",
|
||||
"preallocate": "Pre-alloca spazio disco",
|
||||
"preallocateDes": "Quando abilitato, la richiesta di caricamento dell'utente pre-allocherà spazio disco sul nodo di archiviazione, efficace solo su Linux o Darwin.",
|
||||
"chunkConcurrency": "Caricamenti chunk concorrenti",
|
||||
"chunkConcurrencyDes": "Imposta il numero di caricamenti chunk concorrenti quando si utilizza il caricamento diretto web.",
|
||||
"sourceWebEdit": "Modifica online web",
|
||||
"uploadRelay": "Relay caricamento",
|
||||
"uploadRelayDes": "Se abilitato, le richieste di caricamento degli utenti saranno inoltrate al nodo di archiviazione tramite Cloudreve, a causa dell'impossibilità di eseguire caricamenti in chunk, regola di conseguenza il limite massimo di dimensione caricamento del server web.",
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Upyun ストレージの場合、<0>画像処理</0>サービスを使用してサムネイルを生成します。",
|
||||
"preallocate": "ディスク領域の事前割り当て",
|
||||
"preallocateDes": "有効にすると、ユーザーがファイルをアップロードする際にディスク領域が事前に割り当てられます。LinuxまたはDarwinでのみ有効です。",
|
||||
"chunkConcurrency": "並行チャンクアップロード数",
|
||||
"chunkConcurrencyDes": "Webダイレクトアップロード時の並行チャンクアップロード数を設定します。",
|
||||
"sourceWebEdit": "Webオンライン編集",
|
||||
"uploadRelay": "中継アップロード",
|
||||
"uploadRelayDes": "有効にすると、ユーザーのアップロードリクエストはCloudreveを経由してストレージ側に転送されます。チャンクアップロードができないため、Webサーバー側の最大アップロードサイズ制限を調整してください。",
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Upyun 스토리지의 경우 <0>이미지 처리</0> 서비스를 사용하여 썸네일을 생성합니다.",
|
||||
"preallocate": "하드 디스크 공간 사전 할당",
|
||||
"preallocateDes": "활성화하면 사용자가 파일을 업로드할 때 하드 디스크 공간을 미리 할당하며, Linux 또는 Darwin에서만 유효합니다.",
|
||||
"chunkConcurrency": "병렬 청크 업로드 수",
|
||||
"chunkConcurrencyDes": "웹 직접 업로드 시 동시 진행되는 청크 업로드 수를 설정합니다.",
|
||||
"sourceWebEdit": "웹 온라인 편집",
|
||||
"uploadRelay": "업로드 중계",
|
||||
"uploadRelayDes": "활성화하면 사용자의 업로드 요청이 Cloudreve를 통해 스토리지 측으로 중계되며, 분할 업로드를 수행할 수 없으므로 웹 서버의 최대 업로드 크기 제한을 적절히 조정하세요.",
|
||||
|
|
|
|||
|
|
@ -817,6 +817,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Para armazenamento Upyun, o serviço de <0>processamento de imagem</0> será usado para gerar miniaturas.",
|
||||
"preallocate": "Pré-alocar espaço em disco",
|
||||
"preallocateDes": "Quando habilitado, a solicitação de upload do usuário pré-alocará espaço em disco no nó de armazenamento, eficaz apenas no Linux ou Darwin.",
|
||||
"chunkConcurrency": "Uploads de chunk concorrentes",
|
||||
"chunkConcurrencyDes": "Define o número de uploads de chunk concorrentes ao usar upload direto da web.",
|
||||
"sourceWebEdit": "Edição online na Web",
|
||||
"uploadRelay": "Relay de upload",
|
||||
"uploadRelayDes": "Se habilitado, as solicitações de upload dos usuários serão retransmitidas para o nó de armazenamento via Cloudreve, devido à incapacidade de realizar uploads fragmentados, ajuste o limite de tamanho máximo de upload do servidor web adequadamente.",
|
||||
|
|
|
|||
|
|
@ -819,6 +819,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "Для хранилища Upyun будет использоваться служба <0>обработки изображений</0> для генерации миниатюр.",
|
||||
"preallocate": "Предварительное выделение дискового пространства",
|
||||
"preallocateDes": "При включении дисковое пространство будет предварительно выделяться при загрузке файлов пользователями. Действует только в Linux или Darwin.",
|
||||
"chunkConcurrency": "Параллельная загрузка фрагментов",
|
||||
"chunkConcurrencyDes": "Устанавливает количество параллельных загрузок фрагментов при прямой веб-загрузке.",
|
||||
"sourceWebEdit": "Веб-редактирование онлайн",
|
||||
"uploadRelay": "Ретрансляция загрузки",
|
||||
"uploadRelayDes": "При включении запросы загрузки пользователей будут ретранслироваться через Cloudreve в хранилище. Поскольку фрагментированная загрузка невозможна, обратите внимание на настройку максимального размера загрузки веб-сервера.",
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "对于又拍云存储,<0>图片处理</0>服务会被用来生成缩略图。",
|
||||
"preallocate": "预分配硬盘空间",
|
||||
"preallocateDes": "开启后,用户上传文件时会预先分配硬盘空间,只在 Linux 或 Darwin 下有效。",
|
||||
"chunkConcurrency": "并行上传分片数",
|
||||
"chunkConcurrencyDes": "设定 Web 端直传时,同时进行的分片上传数量。",
|
||||
"sourceWebEdit": "Web 在线编辑",
|
||||
"uploadRelay": "中转上传",
|
||||
"uploadRelayDes": "开启后,用户的上传请求会通过 Cloudreve 中转到存储端,因为无法进行分片上传,请注意调整 Web 服务器端最大上传大小限制。",
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@
|
|||
"nativeThumbNailsGeneralUpyun": "對於又拍雲端儲存,<0>圖片處理</0>服務會被用來生成縮圖。",
|
||||
"preallocate": "預分配硬碟空間",
|
||||
"preallocateDes": "開啟後,使用者上傳檔案時會預先分配硬碟空間,只在 Linux 或 Darwin 下有效。",
|
||||
"chunkConcurrency": "並行上傳分片數",
|
||||
"chunkConcurrencyDes": "設定 Web 端直傳時,同時進行的分片上傳數量。",
|
||||
"sourceWebEdit": "Web 線上編輯",
|
||||
"uploadRelay": "中轉上傳",
|
||||
"uploadRelayDes": "開啟後,使用者的上傳請求會通過 Cloudreve 中轉到儲存端,因為無法進行分片上傳,請注意調整 Web 伺服器端最大上傳大小限制。",
|
||||
|
|
|
|||
|
|
@ -234,6 +234,7 @@ export interface PolicySetting {
|
|||
use_cname?: boolean;
|
||||
source_auth?: boolean;
|
||||
qiniu_upload_cdn?: boolean;
|
||||
chunk_concurrency?: number;
|
||||
}
|
||||
|
||||
export interface User extends CommonMixin {
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ export interface StoragePolicy {
|
|||
max_size: number;
|
||||
type: PolicyType;
|
||||
relay?: boolean;
|
||||
chunk_concurrency?: number;
|
||||
}
|
||||
|
||||
export interface PaginationResults {
|
||||
|
|
|
|||
|
|
@ -188,6 +188,21 @@ const StorageAndUploadSection = () => {
|
|||
[setPolicy],
|
||||
);
|
||||
|
||||
const onChunkConcurrencyChange = useCallback(
|
||||
(e: React.ChangeEvent<HTMLInputElement>) => {
|
||||
let value: number | undefined = parseInt(e.target.value) ?? 1;
|
||||
if (value <= 1) {
|
||||
value = undefined;
|
||||
}
|
||||
|
||||
setPolicy((p: StoragePolicy) => ({
|
||||
...p,
|
||||
settings: { ...p.settings, chunk_concurrency: value },
|
||||
}));
|
||||
},
|
||||
[setPolicy],
|
||||
);
|
||||
|
||||
return (
|
||||
<SettingSection>
|
||||
<Typography variant="h6" gutterBottom>
|
||||
|
|
@ -347,6 +362,29 @@ const StorageAndUploadSection = () => {
|
|||
</FormControl>
|
||||
</SettingForm>
|
||||
)}
|
||||
{(values.type === PolicyType.s3 ||
|
||||
values.type === PolicyType.ks3 ||
|
||||
values.type === PolicyType.cos ||
|
||||
values.type === PolicyType.obs ||
|
||||
values.type === PolicyType.oss ||
|
||||
values.type === PolicyType.qiniu) && (
|
||||
<SettingForm lgWidth={5} title={t("policy.chunkConcurrency")}>
|
||||
<FormControl fullWidth>
|
||||
<DenseFilledTextField
|
||||
value={values.settings?.chunk_concurrency ?? 1}
|
||||
onChange={onChunkConcurrencyChange}
|
||||
slotProps={{
|
||||
htmlInput: {
|
||||
type: "number",
|
||||
min: 1,
|
||||
mac: 10,
|
||||
},
|
||||
}}
|
||||
/>
|
||||
<NoMarginHelperText>{t("policy.chunkConcurrencyDes")}</NoMarginHelperText>
|
||||
</FormControl>
|
||||
</SettingForm>
|
||||
)}
|
||||
{values.type !== PolicyType.local && (
|
||||
<>
|
||||
<SettingForm lgWidth={5}>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import Base from "./base";
|
||||
import * as utils from "../utils";
|
||||
import Base from "./base";
|
||||
|
||||
export interface ChunkProgress {
|
||||
loaded: number;
|
||||
|
|
@ -13,29 +13,156 @@ export interface ChunkInfo {
|
|||
}
|
||||
|
||||
export default abstract class Chunk extends Base {
|
||||
protected chunks: Blob[];
|
||||
protected chunks: Blob[] = [];
|
||||
private readonly DEFAULT_CONCURRENCY = 1; // Default concurrent uploads
|
||||
private readonly MAX_RETRIES = 3;
|
||||
private progressUpdateMutex = Promise.resolve(); // Ensure progress updates are serialized
|
||||
|
||||
public upload = async () => {
|
||||
this.logger.info("Preparing uploading file chunks.");
|
||||
this.initBeforeUploadChunks();
|
||||
|
||||
this.logger.info("Starting uploading file chunks:", this.chunks);
|
||||
this.logger.info("Starting concurrent uploading of file chunks:", this.chunks);
|
||||
this.updateLocalCache();
|
||||
for (let i = 0; i < this.chunks.length; i++) {
|
||||
if (this.task.chunkProgress[i].loaded < this.chunks[i].size || this.chunks[i].size == 0) {
|
||||
await this.uploadChunk({ chunk: this.chunks[i], index: i });
|
||||
this.logger.info(`Chunk [${i}] uploaded.`);
|
||||
|
||||
await this.uploadChunksWithDynamicPool();
|
||||
};
|
||||
|
||||
private async uploadChunksWithDynamicPool() {
|
||||
// Get chunks that need to be uploaded
|
||||
const chunksToUpload = this.chunks
|
||||
.map((chunk, index) => ({ chunk, index }))
|
||||
.filter(({ chunk, index }) => this.task.chunkProgress[index].loaded < chunk.size || chunk.size === 0);
|
||||
|
||||
if (chunksToUpload.length === 0) {
|
||||
this.logger.info("All chunks already uploaded, skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Found ${chunksToUpload.length} chunks to upload out of ${this.chunks.length} total chunks.`);
|
||||
|
||||
const concurrency = this.getConcurrency();
|
||||
let chunkIndex = 0;
|
||||
let activeCount = 0;
|
||||
let hasError = false;
|
||||
let firstError: any = null;
|
||||
|
||||
// Helper function to start a new upload with immediate callback
|
||||
const startUpload = (chunkInfo: { chunk: Blob; index: number }): Promise<void> => {
|
||||
// Don't start new uploads if there's already an error
|
||||
if (hasError) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
activeCount++;
|
||||
this.logger.info(`Starting chunk [${chunkInfo.index}], active uploads: ${activeCount}`);
|
||||
|
||||
return this.uploadChunkWithRetryAndCallback(chunkInfo, () => {
|
||||
this.updateLocalCache();
|
||||
})
|
||||
.then(() => {
|
||||
activeCount--;
|
||||
this.logger.info(`Chunk [${chunkInfo.index}] completed successfully, active uploads: ${activeCount}`);
|
||||
|
||||
// Start next chunk immediately if available and no error occurred
|
||||
if (!hasError && chunkIndex < chunksToUpload.length) {
|
||||
startUpload(chunksToUpload[chunkIndex++]);
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
activeCount--;
|
||||
this.logger.error(`Chunk [${chunkInfo.index}] failed, stopping all uploads:`, error);
|
||||
|
||||
// Mark error state to stop new uploads
|
||||
hasError = true;
|
||||
if (!firstError) {
|
||||
firstError = error;
|
||||
}
|
||||
|
||||
// Cancel all remaining uploads
|
||||
this.cancelToken.cancel();
|
||||
});
|
||||
};
|
||||
|
||||
// Start initial uploads up to concurrency limit
|
||||
const initialPromises: Promise<void>[] = [];
|
||||
while (chunkIndex < chunksToUpload.length && initialPromises.length < concurrency) {
|
||||
initialPromises.push(startUpload(chunksToUpload[chunkIndex++]));
|
||||
}
|
||||
|
||||
// Wait for all uploads to complete or fail
|
||||
await Promise.allSettled(initialPromises);
|
||||
|
||||
// Wait for any remaining uploads started by callbacks to finish
|
||||
while (activeCount > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100)); // Small delay to check active count
|
||||
}
|
||||
|
||||
// Throw error immediately if any chunk failed
|
||||
if (firstError) {
|
||||
this.logger.error("Upload process stopped due to chunk failure");
|
||||
throw firstError;
|
||||
}
|
||||
|
||||
this.logger.info("All chunks uploaded successfully.");
|
||||
}
|
||||
|
||||
private async uploadChunkWithRetryAndCallback(
|
||||
chunkInfo: ChunkInfo,
|
||||
onComplete: () => void,
|
||||
retryCount = 0,
|
||||
): Promise<void> {
|
||||
// Check for cancellation before attempting upload
|
||||
if (this.cancelToken.token.reason) {
|
||||
throw new Error("Upload cancelled by user");
|
||||
}
|
||||
|
||||
try {
|
||||
await this.uploadChunk(chunkInfo);
|
||||
this.logger.info(`Chunk [${chunkInfo.index}] uploaded successfully.`);
|
||||
onComplete(); // Call callback immediately after successful upload
|
||||
} catch (error) {
|
||||
// Don't retry if upload was cancelled
|
||||
if (this.cancelToken.token.reason) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (retryCount < this.MAX_RETRIES) {
|
||||
this.logger.warn(
|
||||
`Chunk [${chunkInfo.index}] upload failed, retrying (${retryCount + 1}/${this.MAX_RETRIES}):`,
|
||||
error,
|
||||
);
|
||||
|
||||
// Wait with exponential backoff, but check for cancellation
|
||||
const delay = Math.pow(2, retryCount) * 1000;
|
||||
await new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(resolve, delay);
|
||||
|
||||
// Cancel the timeout if upload is cancelled
|
||||
this.cancelToken.token.promise?.then(() => {
|
||||
clearTimeout(timeout);
|
||||
reject(new Error("Upload cancelled during retry delay"));
|
||||
});
|
||||
});
|
||||
|
||||
await this.uploadChunkWithRetryAndCallback(chunkInfo, onComplete, retryCount + 1);
|
||||
} else {
|
||||
this.logger.error(`Chunk [${chunkInfo.index}] upload failed after ${this.MAX_RETRIES} retries:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private getConcurrency(): number {
|
||||
return this.task.policy.chunk_concurrency || this.DEFAULT_CONCURRENCY;
|
||||
}
|
||||
|
||||
private initBeforeUploadChunks() {
|
||||
this.chunks = utils.getChunks(this.task.file, this.task.session?.chunk_size);
|
||||
const cachedInfo = utils.getResumeCtx(this.task, this.logger);
|
||||
if (cachedInfo == null) {
|
||||
this.task.chunkProgress = this.chunks.map(
|
||||
(value, index): ChunkProgress => ({
|
||||
(_, index): ChunkProgress => ({
|
||||
loaded: 0,
|
||||
index,
|
||||
}),
|
||||
|
|
@ -45,11 +172,17 @@ export default abstract class Chunk extends Base {
|
|||
this.notifyResumeProgress();
|
||||
}
|
||||
|
||||
protected abstract async uploadChunk(chunkInfo: ChunkInfo): Promise<any>;
|
||||
protected abstract uploadChunk(chunkInfo: ChunkInfo): Promise<any>;
|
||||
|
||||
protected updateChunkProgress(loaded: number, index: number) {
|
||||
this.task.chunkProgress[index].loaded = loaded;
|
||||
this.notifyResumeProgress();
|
||||
protected updateChunkProgress(loaded: number, index: number, etag?: string) {
|
||||
// Serialize progress updates to avoid race conditions in concurrent uploads
|
||||
this.progressUpdateMutex = this.progressUpdateMutex.then(async () => {
|
||||
this.task.chunkProgress[index].loaded = loaded;
|
||||
if (etag) {
|
||||
this.task.chunkProgress[index].etag = etag;
|
||||
}
|
||||
this.notifyResumeProgress();
|
||||
});
|
||||
}
|
||||
|
||||
private notifyResumeProgress() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue