This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new feca1cd2b7 feat: cap number of concurrent file uploads (#3735)
feca1cd2b7 is described below
commit feca1cd2b727a28cecd85ede272d9c0f94736889
Author: Xuan Gu <[email protected]>
AuthorDate: Sun Sep 28 19:21:33 2025 -0700
feat: cap number of concurrent file uploads (#3735)
---
core/config/src/main/resources/default.conf | 5 +-
.../admin/settings/admin-settings.component.html | 12 ++
.../admin/settings/admin-settings.component.ts | 17 +-
.../dataset-detail.component.html | 118 +++++++----
.../dataset-detail.component.scss | 19 +-
.../dataset-detail.component.ts | 216 ++++++++++++++-------
...user-dataset-staged-objects-list.component.html | 1 -
7 files changed, 270 insertions(+), 118 deletions(-)
diff --git a/core/config/src/main/resources/default.conf
b/core/config/src/main/resources/default.conf
index 548a4882d3..5df68ecbfc 100644
--- a/core/config/src/main/resources/default.conf
+++ b/core/config/src/main/resources/default.conf
@@ -52,11 +52,10 @@ gui {
}
dataset {
- # the file size limit for dataset upload
single_file_upload_max_size_mib = 20
+ max_number_of_concurrent_uploading_file = 3
- # the maximum number of file chunks that can be held in the memory;
- # you may increase this number if your deployment environment has enough
memory resource
+ # The maximum number of file chunks that can be held in the memory
max_number_of_concurrent_uploading_file_chunks = 10
# the size of each chunk during the multipart upload of file
diff --git
a/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.html
b/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.html
index 8438ab7213..21f42c8a09 100644
---
a/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.html
+++
b/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.html
@@ -272,6 +272,18 @@
</div>
</ng-template>
+ <div class="settings-row">
+ <span>Concurrent Files:</span>
+ <nz-input-number
+ [(ngModel)]="maxConcurrentFiles"
+ [nzMin]="1"
+ [nzMax]="1000"
+ [nzStep]="1"
+ [nzPrecision]="0">
+ </nz-input-number>
+ </div>
+ <div class="help-text-number">Number of files that can be uploaded
simultaneously. (Range: 1 - 1000)</div>
+
<div class="settings-row">
<span>File Size:</span>
<nz-input-number
diff --git
a/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.ts
b/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.ts
index f7d5186ec3..52a8c5d2e0 100644
---
a/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.ts
+++
b/core/gui/src/app/dashboard/component/admin/settings/admin-settings.component.ts
@@ -48,6 +48,7 @@ export class AdminSettingsComponent implements OnInit {
about_enabled: false,
};
+ maxConcurrentFiles: number = 3;
maxFileSizeMiB: number = 20;
maxConcurrentChunks: number = 10;
chunkSizeMiB: number = 50;
@@ -97,6 +98,10 @@ export class AdminSettingsComponent implements OnInit {
}
private loadDatasetSettings(): void {
+ this.adminSettingsService
+ .getSetting("max_number_of_concurrent_uploading_file")
+ .pipe(untilDestroyed(this))
+ .subscribe(value => (this.maxConcurrentFiles = parseInt(value)));
this.adminSettingsService
.getSetting("single_file_upload_max_size_mib")
.pipe(untilDestroyed(this))
@@ -203,7 +208,12 @@ export class AdminSettingsComponent implements OnInit {
}
saveDatasetSettings(): void {
- if (this.maxFileSizeMiB < 1 || this.maxConcurrentChunks < 1 ||
this.chunkSizeMiB < 1) {
+ if (
+ this.maxFileSizeMiB < 1 ||
+ this.maxConcurrentFiles < 1 ||
+ this.maxConcurrentChunks < 1 ||
+ this.chunkSizeMiB < 1
+ ) {
this.message.error("Please enter valid integer values.");
return;
}
@@ -217,6 +227,10 @@ export class AdminSettingsComponent implements OnInit {
}
const saveRequests = [
+ this.adminSettingsService.updateSetting(
+ "max_number_of_concurrent_uploading_file",
+ this.maxConcurrentFiles.toString()
+ ),
this.adminSettingsService.updateSetting("single_file_upload_max_size_mib",
this.maxFileSizeMiB.toString()),
this.adminSettingsService.updateSetting(
"max_number_of_concurrent_uploading_file_chunks",
@@ -235,6 +249,7 @@ export class AdminSettingsComponent implements OnInit {
resetDatasetSettings(): void {
[
+ "max_number_of_concurrent_uploading_file",
"single_file_upload_max_size_mib",
"max_number_of_concurrent_uploading_file_chunks",
"multipart_upload_chunk_size_mib",
diff --git
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html
index 622124625c..d2b01ac115 100644
---
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html
+++
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html
@@ -276,54 +276,90 @@
nzActive="true"
nzHeader="Create New Version">
<texera-user-files-uploader
(uploadedFiles)="onNewUploadFilesChanged($event)"> </texera-user-files-uploader>
- <div class="upload-progress-wrapper">
- <div
- *ngFor="let task of uploadTasks; trackBy: trackByTask"
- class="upload-progress-container">
- <div class="progress-header">
- <span><b>{{ task.status }}</b>: {{ task.filePath }}</span>
- <button
- nz-button
- nzType="text"
- nzShape="circle"
- [nz-tooltip]="
+
+ <nz-collapse
+ nzGhost
+ class="upload-status-panels">
+ <nz-collapse-panel
+ [nzHeader]="queuedCount > 0
+ ? ('Pending: ' + queuedCount + ' file(s)')
+ : 'Pending'">
+ <div
+ *ngIf="queuedCount > 0"
+ class="upload-progress-wrapper-pending">
+ <div
+ *ngFor="let fileName of queuedFileNames"
+ class="upload-progress-container">
+ <span>{{ fileName }}</span>
+ </div>
+ </div>
+ </nz-collapse-panel>
+ <nz-divider class="section-divider"></nz-divider>
+
+ <nz-collapse-panel
+ [nzHeader]="activeCount > 0
+ ? ('Uploading: ' + activeCount + ' file(s)')
+ : 'Uploading'">
+ <div
+ *ngIf="activeCount > 0"
+ class="upload-progress-wrapper">
+ <div
+ *ngFor="let task of uploadTasks; trackBy: trackByTask"
+ class="upload-progress-container">
+ <div class="progress-header">
+ <span><b>{{ task.status }}</b>: {{ task.filePath }}</span>
+ <button
+ nz-button
+ nzType="text"
+ nzShape="circle"
+ [nz-tooltip]="
(task.status === 'aborted' || task.status === 'finished')
? 'Close'
: 'Cancel the upload'
"
- (click)="onClickAbortUploadProgress(task)">
- <i
- nz-icon
- nzType="close"
- nzTheme="outline"></i>
- </button>
- </div>
- <div
- class="upload-stats"
- *ngIf="task.status !== 'initializing'">
- <nz-progress
- [nzPercent]="task.percentage"
- [nzStatus]="getUploadStatus(task.status)"></nz-progress>
- <nz-tag
- *ngIf="task.status === 'uploading'"
- [nzColor]="'blue'">
- <span class="fixed-width-speed">{{
formatSpeed(task.uploadSpeed) }}</span> -
- <span class="fixed-width-time">{{ formatTime(task.totalTime
?? 0) }}</span> elapsed,
- <span class="fixed-width-time">{{
formatTime(task.estimatedTimeRemaining ?? 0) }} left</span>
- </nz-tag>
+ (click)="onClickAbortUploadProgress(task)">
+ <i
+ nz-icon
+ nzType="close"
+ nzTheme="outline"></i>
+ </button>
+ </div>
+
+ <div
+ class="upload-stats"
+ *ngIf="task.status !== 'initializing'">
+ <nz-progress
+ [nzPercent]="task.percentage"
+ [nzStatus]="getUploadStatus(task.status)"></nz-progress>
+ <nz-tag
+ *ngIf="task.status === 'uploading'"
+ [nzColor]="'blue'">
+ <span class="fixed-width-speed">{{
formatSpeed(task.uploadSpeed) }}</span> -
+ <span class="fixed-width-time">{{
formatTime(task.totalTime ?? 0) }}</span> elapsed,
+ <span class="fixed-width-time">{{
formatTime(task.estimatedTimeRemaining ?? 0) }} left</span>
+ </nz-tag>
- <nz-tag *ngIf="(task.status === 'finished' || task.status ===
'aborted')">
- Upload time: {{ formatTime(task.totalTime ?? 0) }}
- </nz-tag>
+ <nz-tag *ngIf="(task.status === 'finished' || task.status
=== 'aborted')">
+ Upload time: {{ formatTime(task.totalTime ?? 0) }}
+ </nz-tag>
+ </div>
+ </div>
</div>
- </div>
- </div>
+ </nz-collapse-panel>
+ <nz-divider class="section-divider"></nz-divider>
- <texera-dataset-staged-objects-list
- [uploadTimeMap]="uploadTimeMap"
- [did]="did"
- [userMakeChangesEvent]="userMakeChanges"
-
(stagedObjectsChanged)="onStagedObjectsUpdated($event)"></texera-dataset-staged-objects-list>
+ <nz-collapse-panel
+ [nzHeader]="pendingChangesCount > 0
+ ? ('Finished: ' + pendingChangesCount + ' file(s)')
+ : 'Finished'"
+ [nzActive]="false">
+ <texera-dataset-staged-objects-list
+ [uploadTimeMap]="uploadTimeMap"
+ [did]="did"
+ [userMakeChangesEvent]="userMakeChanges"
+
(stagedObjectsChanged)="onStagedObjectsUpdated($event)"></texera-dataset-staged-objects-list>
+ </nz-collapse-panel>
+ </nz-collapse>
<div
*ngIf="userHasWriteAccess() && userHasPendingChanges"
class="version-creator">
diff --git
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.scss
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.scss
index 6e40560aa0..0790f28358 100644
---
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.scss
+++
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.scss
@@ -170,7 +170,9 @@ nz-select {
margin-top: 15%;
}
-.upload-progress-wrapper {
+.upload-progress-wrapper,
+.upload-progress-wrapper-pending {
+ margin-top: 5px;
max-height: 25vh;
overflow-y: auto;
padding-right: 4px;
@@ -180,6 +182,13 @@ nz-select {
margin-left: 20px;
}
+.upload-progress-wrapper-pending {
+ display: flex;
+ flex-direction: column;
+ gap: 10px;
+ max-height: 15vh;
+}
+
.version-creator {
margin-top: 20px;
padding: 40px;
@@ -233,6 +242,10 @@ nz-select {
.upload-stats {
font-size: 13px;
margin-bottom: 20px;
+ nz-progress {
+ width: 97%;
+ display: inline-block;
+ }
}
:host ::ng-deep .upload-stats .ant-tag {
@@ -250,3 +263,7 @@ nz-select {
min-width: 2ch;
text-align: right;
}
+
+.section-divider {
+ margin: 8px 0;
+}
diff --git
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
index 7b7f7947f3..a7ae1e17d8 100644
---
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
+++
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
@@ -85,12 +85,19 @@ export class DatasetDetailComponent implements OnInit {
public displayPreciseViewCount = false;
userHasPendingChanges: boolean = false;
+ pendingChangesCount: number = 0;
+
// Uploading setting
chunkSizeMiB: number = 50;
maxConcurrentChunks: number = 10;
private uploadSubscriptions = new Map<string, Subscription>();
uploadTimeMap = new Map<string, number>();
+ // Cap number of concurrent files uploads
+ maxConcurrentFiles: number = 3;
+ private activeUploads: number = 0;
+ private pendingQueue: Array<{ fileName: string; startUpload: () => void }> =
[];
+
versionName: string = "";
isCreatingVersion: boolean = false;
@@ -100,7 +107,6 @@ export class DatasetDetailComponent implements OnInit {
filePath: string;
}
> = [];
- private autoHideTimers: number[] = [];
@Output() userMakeChanges = new EventEmitter<void>();
@@ -329,6 +335,7 @@ export class DatasetDetailComponent implements OnInit {
onStagedObjectsUpdated(stagedObjects: DatasetStagedObject[]) {
this.userHasPendingChanges = stagedObjects.length > 0;
+ this.pendingChangesCount = stagedObjects.length;
}
onVersionSelected(version: DatasetVersion): void {
@@ -385,95 +392,157 @@ export class DatasetDetailComponent implements OnInit {
.getSetting("max_number_of_concurrent_uploading_file_chunks")
.pipe(untilDestroyed(this))
.subscribe(value => (this.maxConcurrentChunks = parseInt(value)));
+ this.adminSettingsService
+ .getSetting("max_number_of_concurrent_uploading_file")
+ .pipe(untilDestroyed(this))
+ .subscribe(value => {
+ this.maxConcurrentFiles = parseInt(value);
+ });
}
onNewUploadFilesChanged(files: FileUploadItem[]) {
if (this.did) {
- files.forEach((file, idx) => {
- // Cancel any existing upload for the same file to prevent progress
confusion
- this.uploadSubscriptions.get(file.name)?.unsubscribe();
- this.uploadSubscriptions.delete(file.name);
- this.uploadTasks = this.uploadTasks.filter(t => t.filePath !==
file.name);
-
- // Add an initializing task placeholder to uploadTasks
- this.uploadTasks.push({
- filePath: file.name,
- percentage: 0,
- status: "initializing",
- uploadId: "",
- physicalAddress: "",
- });
- // Start multipart upload
- const subscription = this.datasetService
- .multipartUpload(
- this.datasetName,
- file.name,
- file.file,
- this.chunkSizeMiB * 1024 * 1024,
- this.maxConcurrentChunks
- )
- .pipe(untilDestroyed(this))
- .subscribe({
- next: progress => {
- // Find the task
- const taskIndex = this.uploadTasks.findIndex(t => t.filePath ===
file.name);
-
- if (taskIndex !== -1) {
- // Update the task with new progress info
- this.uploadTasks[taskIndex] = {
- ...this.uploadTasks[taskIndex],
- ...progress,
- percentage: progress.percentage ??
this.uploadTasks[taskIndex].percentage ?? 0,
- };
-
- // Auto‑hide when upload is truly finished
- if (progress.status === "finished" && progress.totalTime) {
- const filename = file.name.split("/").pop() || file.name;
- this.uploadTimeMap.set(filename, progress.totalTime);
+ files.forEach(file => {
+ // Check if currently uploading
+ this.cancelExistingUpload(file.name);
+
+ // Create upload function
+ const startUpload = () => {
+ this.pendingQueue = this.pendingQueue.filter(item => item.fileName
!== file.name);
+
+ // Add an initializing task placeholder to uploadTasks
+ this.uploadTasks.unshift({
+ filePath: file.name,
+ percentage: 0,
+ status: "initializing",
+ uploadId: "",
+ physicalAddress: "",
+ });
+ // Start multipart upload
+ const subscription = this.datasetService
+ .multipartUpload(
+ this.datasetName,
+ file.name,
+ file.file,
+ this.chunkSizeMiB * 1024 * 1024,
+ this.maxConcurrentChunks
+ )
+ .pipe(untilDestroyed(this))
+ .subscribe({
+ next: progress => {
+ // Find the task
+ const taskIndex = this.uploadTasks.findIndex(t => t.filePath
=== file.name);
+
+ if (taskIndex !== -1) {
+ // Update the task with new progress info
+ this.uploadTasks[taskIndex] = {
+ ...this.uploadTasks[taskIndex],
+ ...progress,
+ percentage: progress.percentage ??
this.uploadTasks[taskIndex].percentage ?? 0,
+ };
+
+ // Auto-hide when upload is truly finished
+ if (progress.status === "finished" && progress.totalTime) {
+ const filename = file.name.split("/").pop() || file.name;
+ this.uploadTimeMap.set(filename, progress.totalTime);
+ this.userMakeChanges.emit();
+ this.scheduleHide(taskIndex);
+ this.onUploadComplete();
+ }
+ }
+ },
+ error: () => {
+ // Handle upload error
+ const taskIndex = this.uploadTasks.findIndex(t => t.filePath
=== file.name);
+
+ if (taskIndex !== -1) {
+ this.uploadTasks[taskIndex] = {
+ ...this.uploadTasks[taskIndex],
+ percentage: 100,
+ status: "aborted",
+ };
+ this.scheduleHide(taskIndex);
+ }
+ this.onUploadComplete();
+ },
+ complete: () => {
+ const taskIndex = this.uploadTasks.findIndex(t => t.filePath
=== file.name);
+ if (taskIndex !== -1 && this.uploadTasks[taskIndex].status !==
"finished") {
+ this.uploadTasks[taskIndex].status = "finished";
this.userMakeChanges.emit();
this.scheduleHide(taskIndex);
+ this.onUploadComplete();
}
- }
- },
- error: () => {
- // Handle upload error
- const taskIndex = this.uploadTasks.findIndex(t => t.filePath ===
file.name);
-
- if (taskIndex !== -1) {
- this.uploadTasks[taskIndex] = {
- ...this.uploadTasks[taskIndex],
- percentage: 100,
- status: "aborted",
- };
- this.scheduleHide(taskIndex);
- }
- },
- complete: () => {
- const taskIndex = this.uploadTasks.findIndex(t => t.filePath ===
file.name);
- if (taskIndex !== -1 && this.uploadTasks[taskIndex].status !==
"finished") {
- this.uploadTasks[taskIndex].status = "finished";
- this.userMakeChanges.emit();
- this.scheduleHide(taskIndex);
- }
- },
- });
- // Store the subscription for later cleanup
- this.uploadSubscriptions.set(file.name, subscription);
+ },
+ });
+ // Store the subscription for later cleanup
+ this.uploadSubscriptions.set(file.name, subscription);
+ };
+
+ // Queue management
+ if (this.activeUploads < this.maxConcurrentFiles) {
+ this.activeUploads++;
+ startUpload();
+ } else {
+ this.pendingQueue.push({ fileName: file.name, startUpload });
+ }
});
}
}
- // Hide a task row after 5s (stores timer to clear on destroy) and clean up
its subscription
+ private cancelExistingUpload(fileName: string): void {
+ const isUploading = this.uploadTasks.some(
+ t => t.filePath === fileName && (t.status === "uploading" || t.status
=== "initializing")
+ );
+ this.uploadSubscriptions.get(fileName)?.unsubscribe();
+ this.uploadSubscriptions.delete(fileName);
+ this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== fileName);
+
+ // Process next in queue if this was active
+ if (isUploading) {
+ this.onUploadComplete();
+ }
+ // Remove from pending queue if present
+ this.pendingQueue = this.pendingQueue.filter(item => item.fileName !==
fileName);
+ }
+
+ private processNextQueuedUpload(): void {
+ if (this.pendingQueue.length > 0 && this.activeUploads <
this.maxConcurrentFiles) {
+ const next = this.pendingQueue.shift();
+ if (next) {
+ this.activeUploads++;
+ next.startUpload();
+ }
+ }
+ }
+
+ private onUploadComplete(): void {
+ this.activeUploads--;
+ this.processNextQueuedUpload();
+ }
+
+ get queuedFileNames(): string[] {
+ return this.pendingQueue.map(item => item.fileName);
+ }
+
+ get queuedCount(): number {
+ return this.pendingQueue.length;
+ }
+
+ get activeCount(): number {
+ return this.activeUploads;
+ }
+
+ // Hide a task row after 5s
private scheduleHide(idx: number) {
if (idx === -1) {
return;
}
const key = this.uploadTasks[idx].filePath;
this.uploadSubscriptions.delete(key);
- const handle = window.setTimeout(() => {
+ setTimeout(() => {
this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== key);
}, 5000);
- this.autoHideTimers.push(handle);
}
onClickAbortUploadProgress(task: MultipartUploadProgress & { filePath:
string }) {
@@ -482,6 +551,11 @@ export class DatasetDetailComponent implements OnInit {
subscription.unsubscribe();
this.uploadSubscriptions.delete(task.filePath);
}
+
+ if (task.status === "uploading" || task.status === "initializing") {
+ this.onUploadComplete();
+ }
+
this.datasetService
.finalizeMultipartUpload(
this.datasetName,
diff --git
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-staged-objects-list/user-dataset-staged-objects-list.component.html
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-staged-objects-list/user-dataset-staged-objects-list.component.html
index 5b1dece350..a1820dcecf 100644
---
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-staged-objects-list/user-dataset-staged-objects-list.component.html
+++
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-staged-objects-list/user-dataset-staged-objects-list.component.html
@@ -19,7 +19,6 @@
<div class="staged-object-list-container">
<nz-list
- nzBordered
nzSize="small"
*ngIf="datasetStagedObjects.length > 0">
<nz-list-item *ngFor="let obj of datasetStagedObjects">