This is an automated email from the ASF dual-hosted git repository.

aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 830e3090b1 feat(backend): Resumable Uploads (#4181)
830e3090b1 is described below

commit 830e3090b1b8a419f294a008a01a153259a64b63
Author: carloea2 <[email protected]>
AuthorDate: Wed Feb 4 21:51:42 2026 -0800

    feat(backend): Resumable Uploads (#4181)
---
 .../texera/service/util/S3StorageClient.scala      |   1 +
 .../texera/service/resource/DatasetResource.scala  | 339 ++++++++----
 .../service/resource/DatasetResourceSpec.scala     | 581 ++++++++++++++++++++-
 frontend/src/app/app.module.ts                     |   2 +
 .../conflicting-file-modal-content.component.html  |  35 ++
 .../conflicting-file-modal-content.component.scss  |  22 +
 .../conflicting-file-modal-content.component.ts    |  37 ++
 .../files-uploader/files-uploader.component.ts     | 214 ++++++--
 .../dataset-detail.component.ts                    |  31 +-
 .../service/user/dataset/dataset.service.ts        |  91 ++--
 .../app/dashboard/type/dashboard-file.interface.ts |   1 +
 sql/texera_ddl.sql                                 |   1 +
 sql/updates/20.sql                                 |  37 ++
 13 files changed, 1176 insertions(+), 216 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index b7a66a1bc8..f3d252d413 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -38,6 +38,7 @@ import scala.jdk.CollectionConverters._
 object S3StorageClient {
   val MINIMUM_NUM_OF_MULTIPART_S3_PART: Long = 5L * 1024 * 1024 // 5 MiB
   val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000
+  val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6
 
   // Initialize MinIO-compatible S3 Client
   private lazy val s3Client: S3Client = {
diff --git 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index f39885367c..a60bc07adf 100644
--- 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++ 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -51,11 +51,12 @@ import 
org.apache.texera.service.resource.DatasetResource.{context, _}
 import org.apache.texera.service.util.S3StorageClient
 import org.apache.texera.service.util.S3StorageClient.{
   MAXIMUM_NUM_OF_MULTIPART_S3_PARTS,
-  MINIMUM_NUM_OF_MULTIPART_S3_PART
+  MINIMUM_NUM_OF_MULTIPART_S3_PART,
+  PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
 }
 import org.jooq.impl.DSL
 import org.jooq.impl.DSL.{inline => inl}
-import org.jooq.{DSLContext, EnumType}
+import org.jooq.{DSLContext, EnumType, Record2, Result}
 
 import java.io.{InputStream, OutputStream}
 import java.net.{HttpURLConnection, URI, URL, URLDecoder}
@@ -73,8 +74,10 @@ import org.jooq.exception.DataAccessException
 import software.amazon.awssdk.services.s3.model.UploadPartResponse
 import org.apache.commons.io.FilenameUtils
 import 
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
+import 
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
 
 import java.sql.SQLException
+import java.time.OffsetDateTime
 import scala.util.Try
 
 object DatasetResource {
@@ -693,14 +696,16 @@ class DatasetResource {
       @QueryParam("filePath") filePath: String,
       @QueryParam("fileSizeBytes") fileSizeBytes: Optional[java.lang.Long],
       @QueryParam("partSizeBytes") partSizeBytes: Optional[java.lang.Long],
+      @QueryParam("restart") restart: Optional[java.lang.Boolean],
       @Auth user: SessionUser
   ): Response = {
     val uid = user.getUid
     val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)
 
     operationType.toLowerCase match {
+      case "list" => listMultipartUploads(dataset.getDid, uid)
       case "init" =>
-        initMultipartUpload(dataset.getDid, filePath, fileSizeBytes, 
partSizeBytes, uid)
+        initMultipartUpload(dataset.getDid, filePath, fileSizeBytes, 
partSizeBytes, restart, uid)
       case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
       case "abort"  => abortMultipartUpload(dataset.getDid, filePath, uid)
       case _ =>
@@ -1488,11 +1493,38 @@ class DatasetResource {
     dataset
   }
 
+  private def listMultipartUploads(did: Integer, requesterUid: Int): Response 
= {
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, requesterUid)) {
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+      }
+
+      val filePaths =
+        ctx
+          .selectDistinct(DATASET_UPLOAD_SESSION.FILE_PATH)
+          .from(DATASET_UPLOAD_SESSION)
+          .where(DATASET_UPLOAD_SESSION.DID.eq(did))
+          .and(
+            DSL.condition(
+              "created_at > current_timestamp - (? * interval '1 hour')",
+              PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
+            )
+          )
+          .orderBy(DATASET_UPLOAD_SESSION.FILE_PATH.asc())
+          .fetch(DATASET_UPLOAD_SESSION.FILE_PATH)
+          .asScala
+          .toList
+
+      Response.ok(Map("filePaths" -> filePaths.asJava)).build()
+    }
+  }
+
   private def initMultipartUpload(
       did: Integer,
       encodedFilePath: String,
       fileSizeBytes: Optional[java.lang.Long],
       partSizeBytes: Optional[java.lang.Long],
+      restart: Optional[java.lang.Boolean],
       uid: Integer
   ): Response = {
 
@@ -1509,27 +1541,17 @@ class DatasetResource {
           URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
         )
 
-      val fileSizeBytesValue: Long =
-        fileSizeBytes
-          .orElseThrow(() =>
-            new BadRequestException("fileSizeBytes is required for 
initialization")
-          )
-
-      if (fileSizeBytesValue <= 0L) {
-        throw new BadRequestException("fileSizeBytes must be > 0")
-      }
+      if (fileSizeBytes == null || !fileSizeBytes.isPresent)
+        throw new BadRequestException("fileSizeBytes is required for 
initialization")
+      if (partSizeBytes == null || !partSizeBytes.isPresent)
+        throw new BadRequestException("partSizeBytes is required for 
initialization")
 
-      val partSizeBytesValue: Long =
-        partSizeBytes
-          .orElseThrow(() =>
-            new BadRequestException("partSizeBytes is required for 
initialization")
-          )
+      val fileSizeBytesValue: Long = fileSizeBytes.get.longValue()
+      val partSizeBytesValue: Long = partSizeBytes.get.longValue()
 
-      if (partSizeBytesValue <= 0L) {
-        throw new BadRequestException("partSizeBytes must be > 0")
-      }
+      if (fileSizeBytesValue <= 0L) throw new 
BadRequestException("fileSizeBytes must be > 0")
+      if (partSizeBytesValue <= 0L) throw new 
BadRequestException("partSizeBytes must be > 0")
 
-      // singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts == 
file size)
       val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
       if (totalMaxBytes <= 0L) {
         throw new WebApplicationException(
@@ -1543,7 +1565,6 @@ class DatasetResource {
         )
       }
 
-      // Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize - 
1) / partSize
       val addend: Long = partSizeBytesValue - 1L
       if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) {
         throw new WebApplicationException(
@@ -1558,111 +1579,229 @@ class DatasetResource {
           s"Computed numParts=$numPartsLong is out of range 
1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
         )
       }
-      val numPartsValue: Int = numPartsLong.toInt
+      val computedNumParts: Int = numPartsLong.toInt
 
-      // S3 multipart constraint: all non-final parts must be >= 5MiB.
-      // If we have >1 parts, then partSizeBytesValue is the non-final part 
size.
-      if (numPartsValue > 1 && partSizeBytesValue < 
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
+      if (computedNumParts > 1 && partSizeBytesValue < 
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
         throw new BadRequestException(
           s"partSizeBytes=$partSizeBytesValue is too small. " +
             s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART 
bytes."
         )
       }
-
-      // Reject if a session already exists
-      val exists = ctx.fetchExists(
-        ctx
-          .selectOne()
-          .from(DATASET_UPLOAD_SESSION)
+      var session: DatasetUploadSessionRecord = null
+      var rows: Result[Record2[Integer, String]] = null
+      try {
+        session = ctx
+          .selectFrom(DATASET_UPLOAD_SESSION)
           .where(
             DATASET_UPLOAD_SESSION.UID
               .eq(uid)
               .and(DATASET_UPLOAD_SESSION.DID.eq(did))
               .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
           )
-      )
-      if (exists) {
-        throw new WebApplicationException(
-          "Upload already in progress for this filePath",
-          Response.Status.CONFLICT
-        )
-      }
-
-      val presign = withLakeFSErrorHandling {
-        LakeFSStorageClient.initiatePresignedMultipartUploads(
-          repositoryName,
-          filePath,
-          numPartsValue
-        )
+          .forUpdate()
+          .noWait()
+          .fetchOne()
+        if (session != null) {
+          //Gain parts lock
+          rows = ctx
+            .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, 
DATASET_UPLOAD_SESSION_PART.ETAG)
+            .from(DATASET_UPLOAD_SESSION_PART)
+            
.where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(session.getUploadId))
+            .forUpdate()
+            .noWait()
+            .fetch()
+          val dbFileSize = session.getFileSizeBytes
+          val dbPartSize = session.getPartSizeBytes
+          val dbNumParts = session.getNumPartsRequested
+          val createdAt: OffsetDateTime = session.getCreatedAt
+
+          val isExpired =
+            createdAt
+              .plusHours(PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS.toLong)
+              .isBefore(OffsetDateTime.now(createdAt.getOffset)) // or 
OffsetDateTime.now()
+
+          val conflictConfig =
+            dbFileSize != fileSizeBytesValue ||
+              dbPartSize != partSizeBytesValue ||
+              dbNumParts != computedNumParts ||
+              isExpired ||
+              Option(restart).exists(_.orElse(false))
+
+          if (conflictConfig) {
+            // Parts will be deleted automatically (ON DELETE CASCADE)
+            ctx
+              .deleteFrom(DATASET_UPLOAD_SESSION)
+              .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId))
+              .execute()
+
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                session.getUploadId,
+                session.getPhysicalAddress
+              )
+            } catch { case _: Throwable => () }
+            session = null
+            rows = null
+          }
+        }
+      } catch {
+        case e: DataAccessException
+            if Option(e.getCause)
+              .collect { case s: SQLException => s.getSQLState }
+              .contains("55P03") =>
+          throw new WebApplicationException(
+            "Another client is uploading this file",
+            Response.Status.CONFLICT
+          )
       }
 
-      val uploadIdStr = presign.getUploadId
-      val physicalAddr = presign.getPhysicalAddress
-
-      try {
-        val rowsInserted = ctx
-          .insertInto(DATASET_UPLOAD_SESSION)
-          .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
-          .set(DATASET_UPLOAD_SESSION.DID, did)
-          .set(DATASET_UPLOAD_SESSION.UID, uid)
-          .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
-          .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
-          .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, 
Integer.valueOf(numPartsValue))
-          .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, 
java.lang.Long.valueOf(fileSizeBytesValue))
-          .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, 
java.lang.Long.valueOf(partSizeBytesValue))
-          .onDuplicateKeyIgnore()
-          .execute()
-
-        if (rowsInserted != 1) {
-          LakeFSStorageClient.abortPresignedMultipartUploads(
+      if (session == null) {
+        val presign = withLakeFSErrorHandling {
+          LakeFSStorageClient.initiatePresignedMultipartUploads(
             repositoryName,
             filePath,
-            uploadIdStr,
-            physicalAddr
-          )
-          throw new WebApplicationException(
-            "Upload already in progress for this filePath",
-            Response.Status.CONFLICT
+            computedNumParts
           )
         }
 
-        // Pre-create part rows 1..numPartsValue with empty ETag.
-        // This makes per-part locking cheap and deterministic.
+        val uploadIdStr = presign.getUploadId
+        val physicalAddr = presign.getPhysicalAddress
+
+        try {
+          val rowsInserted = ctx
+            .insertInto(DATASET_UPLOAD_SESSION)
+            .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
+            .set(DATASET_UPLOAD_SESSION.DID, did)
+            .set(DATASET_UPLOAD_SESSION.UID, uid)
+            .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
+            .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
+            .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, 
Integer.valueOf(computedNumParts))
+            .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, 
java.lang.Long.valueOf(fileSizeBytesValue))
+            .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, 
java.lang.Long.valueOf(partSizeBytesValue))
+            .onDuplicateKeyIgnore()
+            .execute()
 
-        val partNumberSeries = DSL.generateSeries(1, 
numPartsValue).asTable("gs", "pn")
-        val partNumberField = partNumberSeries.field("pn", classOf[Integer])
+          if (rowsInserted == 1) {
+            val partNumberSeries =
+              DSL.generateSeries(1, computedNumParts).asTable("gs", 
"partNumberField")
+            val partNumberField = partNumberSeries.field("partNumberField", 
classOf[Integer])
 
-        ctx
-          .insertInto(
-            DATASET_UPLOAD_SESSION_PART,
-            DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
-            DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
-            DATASET_UPLOAD_SESSION_PART.ETAG
-          )
-          .select(
             ctx
+              .insertInto(
+                DATASET_UPLOAD_SESSION_PART,
+                DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
+                DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
+                DATASET_UPLOAD_SESSION_PART.ETAG
+              )
               .select(
-                inl(uploadIdStr),
-                partNumberField,
-                inl("") // placeholder empty etag
+                ctx
+                  .select(
+                    inl(uploadIdStr),
+                    partNumberField,
+                    inl("")
+                  )
+                  .from(partNumberSeries)
               )
-              .from(partNumberSeries)
-          )
-          .execute()
+              .execute()
+
+            session = ctx
+              .selectFrom(DATASET_UPLOAD_SESSION)
+              .where(
+                DATASET_UPLOAD_SESSION.UID
+                  .eq(uid)
+                  .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                  .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+              )
+              .fetchOne()
+          } else {
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                uploadIdStr,
+                physicalAddr
+              )
+            } catch { case _: Throwable => () }
+
+            session = ctx
+              .selectFrom(DATASET_UPLOAD_SESSION)
+              .where(
+                DATASET_UPLOAD_SESSION.UID
+                  .eq(uid)
+                  .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                  .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+              )
+              .fetchOne()
+          }
+        } catch {
+          case e: Exception =>
+            try {
+              LakeFSStorageClient.abortPresignedMultipartUploads(
+                repositoryName,
+                filePath,
+                uploadIdStr,
+                physicalAddr
+              )
+            } catch { case _: Throwable => () }
+            throw e
+        }
+      }
 
-        Response.ok().build()
-      } catch {
-        case e: Exception =>
+      if (session == null) {
+        throw new WebApplicationException(
+          "Failed to create or locate upload session",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      val dbNumParts = session.getNumPartsRequested
+
+      val uploadId = session.getUploadId
+      val nParts = dbNumParts
+
+      // CHANGED: lock rows with NOWAIT; if any row is locked by another 
uploader -> 409
+      if (rows == null) {
+        rows =
           try {
-            LakeFSStorageClient.abortPresignedMultipartUploads(
-              repositoryName,
-              filePath,
-              uploadIdStr,
-              physicalAddr
-            )
-          } catch { case _: Throwable => () }
-          throw e
+            ctx
+              .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, 
DATASET_UPLOAD_SESSION_PART.ETAG)
+              .from(DATASET_UPLOAD_SESSION_PART)
+              .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+              .forUpdate()
+              .noWait()
+              .fetch()
+          } catch {
+            case e: DataAccessException
+                if Option(e.getCause)
+                  .collect { case s: SQLException => s.getSQLState }
+                  .contains("55P03") =>
+              throw new WebApplicationException(
+                "Another client is uploading parts for this file",
+                Response.Status.CONFLICT
+              )
+          }
       }
+
+      // CHANGED: compute missingParts + completedPartsCount from the SAME 
query result
+      val missingParts = rows.asScala
+        .filter(r =>
+          
Option(r.get(DATASET_UPLOAD_SESSION_PART.ETAG)).map(_.trim).getOrElse("").isEmpty
+        )
+        .map(r => r.get(DATASET_UPLOAD_SESSION_PART.PART_NUMBER).intValue())
+        .toList
+
+      val completedPartsCount = nParts - missingParts.size
+
+      Response
+        .ok(
+          Map(
+            "missingParts" -> missingParts.asJava,
+            "completedPartsCount" -> Integer.valueOf(completedPartsCount)
+          )
+        )
+        .build()
     }
   }
 
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index 0d37298e9e..c03a6d4cb6 100644
--- 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++ 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -32,6 +32,7 @@ import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
 import org.apache.texera.dao.jooq.generated.tables.daos.{DatasetDao, 
DatasetVersionDao, UserDao}
 import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, 
DatasetVersion, User}
 import org.apache.texera.service.MockLakeFS
+import org.apache.texera.service.util.S3StorageClient
 import org.jooq.SQLDialect
 import org.jooq.impl.DSL
 import org.scalatest.flatspec.AnyFlatSpec
@@ -62,6 +63,51 @@ class DatasetResourceSpec
     with BeforeAndAfterAll
     with BeforeAndAfterEach {
 
+  // ---------- Response entity helpers ----------
+  private def entityAsScalaMap(resp: Response): Map[String, Any] = {
+    resp.getEntity match {
+      case m: scala.collection.Map[_, _] =>
+        m.asInstanceOf[scala.collection.Map[String, Any]].toMap
+      case m: java.util.Map[_, _] =>
+        m.asScala.toMap.asInstanceOf[Map[String, Any]]
+      case null => Map.empty
+      case other =>
+        fail(s"Unexpected response entity type: ${other.getClass}")
+    }
+  }
+
+  private def mapListOfInts(x: Any): List[Int] =
+    x match {
+      case l: java.util.List[_]       => l.asScala.map(_.toString.toInt).toList
+      case l: scala.collection.Seq[_] => l.map(_.toString.toInt).toList
+      case other                      => fail(s"Expected list, got: 
${other.getClass}")
+    }
+
+  private def mapListOfStrings(x: Any): List[String] =
+    x match {
+      case l: java.util.List[_]       => l.asScala.map(_.toString).toList
+      case l: scala.collection.Seq[_] => l.map(_.toString).toList
+      case other                      => fail(s"Expected list, got: 
${other.getClass}")
+    }
+
+  private def listUploads(
+      user: SessionUser = multipartOwnerSessionUser
+  ): List[String] = {
+    val resp = datasetResource.multipartUpload(
+      "list",
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc("ignored"),
+      Optional.empty(),
+      Optional.empty(),
+      Optional.empty(),
+      user
+    )
+    resp.getStatus shouldEqual 200
+    val m = entityAsScalaMap(resp)
+    mapListOfStrings(m("filePaths"))
+  }
+
   // ---------- logging (multipart tests can be noisy) ----------
   private var savedLevels: Map[String, Level] = Map.empty
 
@@ -473,7 +519,8 @@ class DatasetResourceSpec
       numParts: Int,
       lastPartBytes: Int = 1,
       partSizeBytes: Int = MinNonFinalPartBytes,
-      user: SessionUser = multipartOwnerSessionUser
+      user: SessionUser = multipartOwnerSessionUser,
+      restart: Optional[java.lang.Boolean] = Optional.empty()
   ): Response = {
     require(numParts >= 1, "numParts must be >= 1")
     require(lastPartBytes > 0, "lastPartBytes must be > 0")
@@ -499,6 +546,24 @@ class DatasetResourceSpec
       urlEnc(filePath),
       Optional.of(java.lang.Long.valueOf(fileSizeBytes)),
       Optional.of(java.lang.Long.valueOf(maxPartSizeBytes)),
+      restart,
+      user
+    )
+  }
+  private def initRaw(
+      filePath: String,
+      fileSizeBytes: Long,
+      partSizeBytes: Long,
+      user: SessionUser = multipartOwnerSessionUser
+  ): Response = {
+    datasetResource.multipartUpload(
+      "init",
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      Optional.of(java.lang.Long.valueOf(fileSizeBytes)),
+      Optional.of(java.lang.Long.valueOf(partSizeBytes)),
+      Optional.empty(),
       user
     )
   }
@@ -514,6 +579,7 @@ class DatasetResourceSpec
       urlEnc(filePath),
       Optional.empty(),
       Optional.empty(),
+      Optional.empty(),
       user
     )
 
@@ -528,6 +594,7 @@ class DatasetResourceSpec
       urlEnc(filePath),
       Optional.empty(),
       Optional.empty(),
+      Optional.empty(),
       user
     )
 
@@ -617,6 +684,74 @@ class DatasetResourceSpec
   private def assertStatus(ex: WebApplicationException, status: Int): Unit =
     ex.getResponse.getStatus shouldEqual status
 
+  // 
---------------------------------------------------------------------------
+  // LIST TESTS (type=list)
+  // 
---------------------------------------------------------------------------
+  "multipart-upload?type=list" should "return empty when no active sessions 
exist for the dataset" in {
+    // Make deterministic: remove any leftover sessions from other tests.
+    getDSLContext
+      .deleteFrom(DATASET_UPLOAD_SESSION)
+      .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+      .execute()
+
+    listUploads() shouldBe empty
+  }
+
+  it should "reject list when caller lacks WRITE access" in {
+    val ex = intercept[ForbiddenException] {
+      listUploads(user = multipartNoWriteSessionUser)
+    }
+    ex.getResponse.getStatus shouldEqual 403
+  }
+
+  it should "return only non-expired sessions, sorted by filePath (and exclude 
expired ones)" in {
+    // Clean slate
+    getDSLContext
+      .deleteFrom(DATASET_UPLOAD_SESSION)
+      .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+      .execute()
+
+    val fpA = uniqueFilePath("list-a")
+    val fpB = uniqueFilePath("list-b")
+
+    initUpload(fpB, numParts = 2).getStatus shouldEqual 200
+    initUpload(fpA, numParts = 2).getStatus shouldEqual 200
+
+    // Expire fpB by pushing created_at back > 6 hours.
+    val uploadIdB = fetchUploadIdOrFail(fpB)
+    val tableName = DATASET_UPLOAD_SESSION.getName // typically 
"dataset_upload_session"
+    getDSLContext
+      .update(DATASET_UPLOAD_SESSION)
+      .set(
+        DATASET_UPLOAD_SESSION.CREATED_AT,
+        DSL.field("current_timestamp - interval '7 
hours'").cast(classOf[java.time.OffsetDateTime])
+      )
+      .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadIdB))
+      .execute()
+
+    val listed = listUploads()
+    listed shouldEqual listed.sorted
+    listed should contain(fpA)
+    listed should not contain fpB
+  }
+
+  it should "not list sessions after abort (cleanup works end-to-end)" in {
+    // Clean slate
+    getDSLContext
+      .deleteFrom(DATASET_UPLOAD_SESSION)
+      .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+      .execute()
+
+    val fp = uniqueFilePath("list-after-abort")
+    initUpload(fp, numParts = 2).getStatus shouldEqual 200
+
+    listUploads() should contain(fp)
+
+    abortUpload(fp).getStatus shouldEqual 200
+
+    listUploads() should not contain fp
+  }
+
   // 
---------------------------------------------------------------------------
   // INIT TESTS
   // 
---------------------------------------------------------------------------
@@ -634,6 +769,345 @@ class DatasetResourceSpec
 
     assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 3)
   }
+  it should "restart session when restart=true is explicitly requested (even 
if config is unchanged) and reset progress" in {
+    val filePath = uniqueFilePath("init-restart-true")
+
+    // Initial init
+    initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus 
shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+
+    // Make progress in old session
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim 
should not be ""
+
+    // Re-init with same config but restart=true => must restart
+    val r2 = initUpload(
+      filePath,
+      numParts = 2,
+      lastPartBytes = 123,
+      restart = Optional.of(java.lang.Boolean.TRUE)
+    )
+    r2.getStatus shouldEqual 200
+
+    val newUploadId = fetchUploadIdOrFail(filePath)
+    newUploadId should not equal oldUploadId
+
+    // Old part rows gone, new placeholders empty
+    fetchPartRows(oldUploadId) shouldBe empty
+    assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+    // Response should look like a fresh session
+    val m = entityAsScalaMap(r2)
+    mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+    m("completedPartsCount").toString.toInt shouldEqual 0
+  }
+
+  it should "not restart session when restart=false (same config) and preserve 
uploadId + progress" in {
+    val filePath = uniqueFilePath("init-restart-false")
+
+    initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus 
shouldEqual 200
+    val uploadId1 = fetchUploadIdOrFail(filePath)
+
+    uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
+
+    val r2 = initUpload(
+      filePath,
+      numParts = 3,
+      lastPartBytes = 123,
+      restart = Optional.of(java.lang.Boolean.FALSE)
+    )
+    r2.getStatus shouldEqual 200
+
+    val uploadId2 = fetchUploadIdOrFail(filePath)
+    uploadId2 shouldEqual uploadId1
+
+    val m = entityAsScalaMap(r2)
+    mapListOfInts(m("missingParts")) shouldEqual List(2, 3)
+    m("completedPartsCount").toString.toInt shouldEqual 1
+  }
+
+  it should "restart even when all parts were already uploaded (restart=true 
makes missingParts full again)" in {
+    val filePath = uniqueFilePath("init-restart-after-all-parts")
+
+    initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus 
shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+
+    // Upload everything (but don't finish)
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(2.toByte, n = 123)).getStatus 
shouldEqual 200
+
+    // Confirm "all done" without restart
+    val rNoRestart = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+    rNoRestart.getStatus shouldEqual 200
+    val mNoRestart = entityAsScalaMap(rNoRestart)
+    mapListOfInts(mNoRestart("missingParts")) shouldEqual Nil
+    mNoRestart("completedPartsCount").toString.toInt shouldEqual 2
+
+    // Now force restart => must reset
+    val rRestart = initUpload(
+      filePath,
+      numParts = 2,
+      lastPartBytes = 123,
+      restart = Optional.of(java.lang.Boolean.TRUE)
+    )
+    rRestart.getStatus shouldEqual 200
+
+    val newUploadId = fetchUploadIdOrFail(filePath)
+    newUploadId should not equal oldUploadId
+    fetchPartRows(oldUploadId) shouldBe empty
+    assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+    val m = entityAsScalaMap(rRestart)
+    mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+    m("completedPartsCount").toString.toInt shouldEqual 0
+  }
+
+  "multipart-upload?type=init" should "restart session when init config 
changes (fileSize/partSize/numParts) and recreate placeholders" in {
+    val filePath = uniqueFilePath("init-conflict-restart")
+
+    // First init => 2 parts
+    initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus 
shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+
+    // Upload part 1 so old session isn't empty
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim 
should not be ""
+
+    // Second init with DIFFERENT config => 3 parts now
+    val resp2 = initUpload(filePath, numParts = 3, lastPartBytes = 50)
+    resp2.getStatus shouldEqual 200
+
+    val newUploadId = fetchUploadIdOrFail(filePath)
+    newUploadId should not equal oldUploadId
+
+    // Old part rows should have been deleted via ON DELETE CASCADE
+    fetchPartRows(oldUploadId) shouldBe empty
+
+    // New placeholders should exist and be empty
+    assertPlaceholdersCreated(newUploadId, expectedParts = 3)
+
+    val m2 = entityAsScalaMap(resp2)
+    mapListOfInts(m2("missingParts")) shouldEqual List(1, 2, 3)
+    m2("completedPartsCount").toString.toInt shouldEqual 0
+  }
+
+  it should "restart session when physicalAddress has expired (created_at too 
old), even if config is unchanged" in {
+    val filePath = uniqueFilePath("init-expired-restart")
+
+    // First init (2 parts)
+    val r1 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+    r1.getStatus shouldEqual 200
+
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+    oldUploadId should not be null
+
+    // Optional: create some progress so we know it truly resets
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim 
should not be ""
+
+    // Age the session so it is definitely expired (> 
PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6)
+    val expireHrs = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
+
+    getDSLContext
+      .update(DATASET_UPLOAD_SESSION)
+      .set(
+        DATASET_UPLOAD_SESSION.CREATED_AT,
+        DSL
+          .field(s"current_timestamp - interval '${expireHrs + 1} hours'")
+          .cast(classOf[java.time.OffsetDateTime])
+      )
+      .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(oldUploadId))
+      .execute()
+
+    // Same init config again -> should restart because it's expired
+    val r2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+    r2.getStatus shouldEqual 200
+
+    val newUploadId = fetchUploadIdOrFail(filePath)
+    newUploadId should not equal oldUploadId
+
+    // Old part rows should have been deleted (ON DELETE CASCADE)
+    fetchPartRows(oldUploadId) shouldBe empty
+
+    // New placeholders should exist, empty
+    assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+    // Response should reflect a fresh session
+    val m2 = entityAsScalaMap(r2)
+    mapListOfInts(m2("missingParts")) shouldEqual List(1, 2)
+    m2("completedPartsCount").toString.toInt shouldEqual 0
+  }
+
+  it should "be resumable: repeated init with same config keeps uploadId and 
returns missingParts + completedPartsCount" in {
+    val filePath = uniqueFilePath("init-resume-same-config")
+
+    val resp1 = initUpload(filePath, numParts = 3, lastPartBytes = 123)
+    resp1.getStatus shouldEqual 200
+    val uploadId1 = fetchUploadIdOrFail(filePath)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+    val resp2 = initUpload(filePath, numParts = 3, lastPartBytes = 123)
+    resp2.getStatus shouldEqual 200
+    val uploadId2 = fetchUploadIdOrFail(filePath)
+
+    uploadId2 shouldEqual uploadId1
+
+    val m2 = entityAsScalaMap(resp2)
+    val missing = mapListOfInts(m2("missingParts"))
+    missing shouldEqual List(2, 3)
+    m2("completedPartsCount").toString.toInt shouldEqual 1
+  }
+  it should "return missingParts=[] when all parts are already uploaded 
(completedPartsCount == numParts)" in {
+    val filePath = uniqueFilePath("init-all-done")
+    initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus 
shouldEqual 200
+
+    uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(8.toByte, n = 123)).getStatus 
shouldEqual 200
+
+    val resp2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+    resp2.getStatus shouldEqual 200
+
+    val m2 = entityAsScalaMap(resp2)
+    mapListOfInts(m2("missingParts")) shouldEqual Nil
+    m2("completedPartsCount").toString.toInt shouldEqual 2
+  }
+  it should "return 409 CONFLICT if the upload session row is locked by 
another transaction" in {
+    val filePath = uniqueFilePath("init-session-row-locked")
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(ownerUser.getUid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      val ex = intercept[WebApplicationException] {
+        initUpload(filePath, numParts = 2)
+      }
+      ex.getResponse.getStatus shouldEqual 409
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
+
+    // lock released => init works again
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+  }
+  it should "treat normalized-equivalent paths as the same session (no 
duplicate sessions)" in {
+    val base = s"norm-${System.nanoTime()}.bin"
+    val raw = s"a/../$base" // normalizes to base
+
+    // init using traversal-ish but normalizable path
+    initUpload(raw, numParts = 1, lastPartBytes = 16, partSizeBytes = 
16).getStatus shouldEqual 200
+    val uploadId1 = fetchUploadIdOrFail(base) // stored path should be 
normalized
+
+    // init using normalized path should hit the same session (resume)
+    val resp2 = initUpload(base, numParts = 1, lastPartBytes = 16, 
partSizeBytes = 16)
+    resp2.getStatus shouldEqual 200
+    val uploadId2 = fetchUploadIdOrFail(base)
+
+    uploadId2 shouldEqual uploadId1
+
+    val m2 = entityAsScalaMap(resp2)
+    mapListOfInts(m2("missingParts")) shouldEqual List(1)
+    m2("completedPartsCount").toString.toInt shouldEqual 0
+  }
+  it should "restart session when fileSizeBytes differs (single-part; 
computedNumParts unchanged)" in {
+    val filePath = uniqueFilePath("init-conflict-filesize")
+
+    val declared = 16
+    val r1 = initRaw(filePath, fileSizeBytes = declared, partSizeBytes = 32L) 
// numParts=1
+    r1.getStatus shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+
+    // Add progress in old session
+    uploadPart(filePath, 1, Array.fill[Byte](declared)(1.toByte)).getStatus 
shouldEqual 200
+
+    fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim 
should not be ""
+
+    val r2 = initRaw(filePath, fileSizeBytes = 17L, partSizeBytes = 32L) // 
numParts=1 still
+    r2.getStatus shouldEqual 200
+    val newUploadId = fetchUploadIdOrFail(filePath)
+
+    newUploadId should not equal oldUploadId
+    fetchPartRows(oldUploadId) shouldBe empty // old placeholders removed
+
+    val session = fetchSession(filePath)
+    session.getFileSizeBytes shouldEqual 17L
+    session.getPartSizeBytes shouldEqual 32L
+    session.getNumPartsRequested shouldEqual 1
+
+    val m = entityAsScalaMap(r2)
+    mapListOfInts(m("missingParts")) shouldEqual List(1)
+    m("completedPartsCount").toString.toInt shouldEqual 0 // progress reset
+  }
+
+  it should "restart session when partSizeBytes differs (single-part; 
computedNumParts unchanged)" in {
+    val filePath = uniqueFilePath("init-conflict-partsize")
+
+    initRaw(filePath, fileSizeBytes = 16L, partSizeBytes = 32L).getStatus 
shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+
+    // Second init, same fileSize, different partSize, still 1 part
+    val r2 = initRaw(filePath, fileSizeBytes = 16L, partSizeBytes = 64L)
+    r2.getStatus shouldEqual 200
+    val newUploadId = fetchUploadIdOrFail(filePath)
+
+    newUploadId should not equal oldUploadId
+    fetchPartRows(oldUploadId) shouldBe empty
+
+    val session = fetchSession(filePath)
+    session.getFileSizeBytes shouldEqual 16L
+    session.getPartSizeBytes shouldEqual 64L
+    session.getNumPartsRequested shouldEqual 1
+
+    val m = entityAsScalaMap(r2)
+    mapListOfInts(m("missingParts")) shouldEqual List(1)
+    m("completedPartsCount").toString.toInt shouldEqual 0
+  }
+  it should "restart session when computed numParts differs (multipart -> 
single-part)" in {
+    val filePath = uniqueFilePath("init-conflict-numparts")
+
+    val partSize = MinNonFinalPartBytes.toLong // 5 MiB
+    val fileSize = partSize * 2L + 123L // => computedNumParts = 3
+
+    val r1 = initRaw(filePath, fileSizeBytes = fileSize, partSizeBytes = 
partSize)
+    r1.getStatus shouldEqual 200
+    val oldUploadId = fetchUploadIdOrFail(filePath)
+    fetchSession(filePath).getNumPartsRequested shouldEqual 3
+
+    // Create progress
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+    // Re-init with a partSize >= fileSize => computedNumParts becomes 1
+    val r2 = initRaw(filePath, fileSizeBytes = fileSize, partSizeBytes = 
fileSize)
+    r2.getStatus shouldEqual 200
+    val newUploadId = fetchUploadIdOrFail(filePath)
+
+    newUploadId should not equal oldUploadId
+    fetchPartRows(oldUploadId) shouldBe empty
+
+    val session = fetchSession(filePath)
+    session.getNumPartsRequested shouldEqual 1
+    session.getFileSizeBytes shouldEqual fileSize
+    session.getPartSizeBytes shouldEqual fileSize
+
+    val m = entityAsScalaMap(r2)
+    mapListOfInts(m("missingParts")) shouldEqual List(1)
+    m("completedPartsCount").toString.toInt shouldEqual 0
+  }
 
   it should "reject missing fileSizeBytes / partSizeBytes" in {
     val filePath1 = uniqueFilePath("init-missing-filesize")
@@ -645,6 +1119,7 @@ class DatasetResourceSpec
         urlEnc(filePath1),
         Optional.empty(),
         Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
     }
@@ -659,6 +1134,7 @@ class DatasetResourceSpec
         urlEnc(filePath2),
         Optional.of(java.lang.Long.valueOf(1L)),
         Optional.empty(),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
     }
@@ -677,6 +1153,7 @@ class DatasetResourceSpec
           urlEnc(filePath),
           Optional.of(java.lang.Long.valueOf(0L)),
           Optional.of(java.lang.Long.valueOf(1L)),
+          Optional.empty(),
           multipartOwnerSessionUser
         )
       },
@@ -692,6 +1169,7 @@ class DatasetResourceSpec
           urlEnc(filePath),
           Optional.of(java.lang.Long.valueOf(1L)),
           Optional.of(java.lang.Long.valueOf(0L)),
+          Optional.empty(),
           multipartOwnerSessionUser
         )
       },
@@ -715,6 +1193,7 @@ class DatasetResourceSpec
           urlEnc(filePathOver),
           Optional.of(java.lang.Long.valueOf(oneMiB + 1L)),
           Optional.of(java.lang.Long.valueOf(oneMiB + 1L)), // single-part
+          Optional.empty(),
           multipartOwnerSessionUser
         )
       },
@@ -730,6 +1209,7 @@ class DatasetResourceSpec
         urlEnc(filePathEq),
         Optional.of(java.lang.Long.valueOf(oneMiB)),
         Optional.of(java.lang.Long.valueOf(oneMiB)), // single-part
+        Optional.empty(),
         multipartOwnerSessionUser
       )
 
@@ -752,6 +1232,7 @@ class DatasetResourceSpec
         urlEnc(filePathEq),
         Optional.of(java.lang.Long.valueOf(max6MiB)),
         Optional.of(java.lang.Long.valueOf(partSize)),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
 
@@ -768,6 +1249,7 @@ class DatasetResourceSpec
           urlEnc(filePathOver),
           Optional.of(java.lang.Long.valueOf(max6MiB + 1L)),
           Optional.of(java.lang.Long.valueOf(partSize)),
+          Optional.empty(),
           multipartOwnerSessionUser
         )
       },
@@ -790,6 +1272,7 @@ class DatasetResourceSpec
         urlEnc(filePath),
         Optional.of(java.lang.Long.valueOf(totalMaxBytes)),
         Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
     }
@@ -821,6 +1304,7 @@ class DatasetResourceSpec
         urlEnc(filePath),
         Optional.empty(),
         Optional.empty(),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
     }
@@ -835,7 +1319,7 @@ class DatasetResourceSpec
     assertStatus(ex, 403)
   }
 
-  it should "handle init race: exactly one succeeds, one gets 409 CONFLICT" in 
{
+  it should "handle init race: concurrent init calls converge to a single 
session (both return 200)" in {
     val filePath = uniqueFilePath("init-race")
     val barrier = new CyclicBarrier(2)
 
@@ -851,31 +1335,61 @@ class DatasetResourceSpec
     val future2 = Future(callInit())
     val results = Await.result(Future.sequence(Seq(future1, future2)), 
30.seconds)
 
-    val oks = results.collect { case Right(r) if r.getStatus == 200 => r }
+    // No unexpected failures
     val fails = results.collect { case Left(t) => t }
-
-    oks.size shouldEqual 1
-    fails.size shouldEqual 1
-
-    fails.head match {
-      case e: WebApplicationException => assertStatus(e, 409)
-      case other =>
-        fail(
-          s"Expected WebApplicationException(CONFLICT), got: ${other.getClass} 
/ ${other.getMessage}"
-        )
+    withClue(s"init race failures: ${fails.map(_.getMessage).mkString(", ")}") 
{
+      fails shouldBe empty
     }
 
+    // Both should be OK
+    val oks = results.collect { case Right(r) => r }
+    oks.size shouldEqual 2
+    oks.foreach(_.getStatus shouldEqual 200)
+
+    // Exactly one session row exists for this file path
     val sessionRecord = fetchSession(filePath)
     sessionRecord should not be null
+
+    // Placeholders created for expected parts
     assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 2)
+
+    //Both responses should report missingParts [1,2] and completedPartsCount 0
+    oks.foreach { r =>
+      val m = entityAsScalaMap(r)
+      mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+      m("completedPartsCount").toString.toInt shouldEqual 0
+    }
   }
 
-  it should "reject sequential double init with 409 CONFLICT" in {
-    val filePath = uniqueFilePath("init-double")
+  it should "return 409 if init cannot acquire the session row lock (NOWAIT)" 
in {
+    val filePath = uniqueFilePath("init-lock-409")
     initUpload(filePath, numParts = 2).getStatus shouldEqual 200
 
-    val ex = intercept[WebApplicationException] { initUpload(filePath, 
numParts = 2) }
-    assertStatus(ex, 409)
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(ownerUser.getUid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      val ex = intercept[WebApplicationException] {
+        initUpload(filePath, numParts = 2)
+      }
+      ex.getResponse.getStatus shouldEqual 409
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
   }
 
   // 
---------------------------------------------------------------------------
@@ -1160,6 +1674,38 @@ class DatasetResourceSpec
     assertStatus(ex, 403)
   }
 
+  "multipart-upload/part" should "treat retries as idempotent once ETag is set 
(no overwrite on second call)" in {
+    val filePath = uniqueFilePath("part-idempotent")
+    initUpload(
+      filePath,
+      numParts = 1,
+      lastPartBytes = 16,
+      partSizeBytes = 16
+    ).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    val n = 16
+    val bytes1: Array[Byte] = Array.tabulate[Byte](n)(i => (i + 1).toByte)
+    val bytes2: Array[Byte] = Array.tabulate[Byte](n)(i => (i + 1).toByte)
+
+    uploadPart(filePath, 1, bytes1).getStatus shouldEqual 200
+    val etag1 = fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag
+
+    uploadPart(filePath, 1, bytes2).getStatus shouldEqual 200
+    val etag2 = fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag
+
+    etag2 shouldEqual etag1
+
+    finishUpload(filePath).getStatus shouldEqual 200
+
+    val repoName = multipartDataset.getRepositoryName
+    val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, "main", 
filePath)
+    val gotBytes = Files.readAllBytes(Paths.get(downloaded.toURI))
+
+    gotBytes.toSeq shouldEqual bytes1.toSeq
+  }
+
   // 
---------------------------------------------------------------------------
   // FINISH TESTS
   // 
---------------------------------------------------------------------------
@@ -1183,6 +1729,7 @@ class DatasetResourceSpec
         urlEnc(filePath),
         Optional.of(java.lang.Long.valueOf(twoMiB)),
         Optional.of(java.lang.Long.valueOf(twoMiB)),
+        Optional.empty(),
         multipartOwnerSessionUser
       )
       .getStatus shouldEqual 200
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 9ddf4bbcc2..a591207f60 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -139,6 +139,7 @@ import { OverlayModule } from "@angular/cdk/overlay";
 import { HighlightSearchTermsPipe } from 
"./dashboard/component/user/user-workflow/user-workflow-list-item/highlight-search-terms.pipe";
 import { en_US, provideNzI18n } from "ng-zorro-antd/i18n";
 import { FilesUploaderComponent } from 
"./dashboard/component/user/files-uploader/files-uploader.component";
+import { ConflictingFileModalContentComponent } from 
"./dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component";
 import { UserDatasetComponent } from 
"./dashboard/component/user/user-dataset/user-dataset.component";
 import { UserDatasetVersionCreatorComponent } from 
"./dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-version-creator/user-dataset-version-creator.component";
 import { DatasetDetailComponent } from 
"./dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component";
@@ -236,6 +237,7 @@ registerLocaleData(en);
     NgbdModalAddProjectWorkflowComponent,
     NgbdModalRemoveProjectWorkflowComponent,
     FilesUploaderComponent,
+    ConflictingFileModalContentComponent,
     UserDatasetComponent,
     UserDatasetVersionCreatorComponent,
     DatasetDetailComponent,
diff --git 
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
new file mode 100644
index 0000000000..1d5f8b849a
--- /dev/null
+++ 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
@@ -0,0 +1,35 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!doctype html>
+<html lang="en">
+  <head>
+    <meta charset="UTF-8" />
+    <title>Title</title>
+  </head>
+  <body>
+    <div>
+      <div><b>File:</b> {{ data.fileName }}</div>
+      <div><b>Path:</b> {{ data.path }}</div>
+      <div><b>Size:</b> {{ data.size }}</div>
+
+      <div class="hint">An upload session already exists for this path.</div>
+    </div>
+  </body>
+</html>
diff --git 
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
new file mode 100644
index 0000000000..34b713cf44
--- /dev/null
+++ 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+.hint {
+  margin-top: 8px;
+}
diff --git 
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
new file mode 100644
index 0000000000..b418929120
--- /dev/null
+++ 
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { ChangeDetectionStrategy, Component, inject } from "@angular/core";
+import { NZ_MODAL_DATA } from "ng-zorro-antd/modal";
+
+export interface ConflictingFileModalData {
+  fileName: string;
+  path: string;
+  size: string;
+}
+
+@Component({
+  selector: "texera-conflicting-file-modal-content",
+  templateUrl: "./conflicting-file-modal-content.component.html",
+  styleUrls: ["./conflicting-file-modal-content.component.scss"],
+  changeDetection: ChangeDetectionStrategy.OnPush,
+})
+export class ConflictingFileModalContentComponent {
+  readonly data: ConflictingFileModalData = inject(NZ_MODAL_DATA);
+}
diff --git 
a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
 
b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
index 0fbd00a357..216b592ee0 100644
--- 
a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
+++ 
b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
@@ -17,13 +17,22 @@
  * under the License.
  */
 
-import { Component, EventEmitter, Input, Output } from "@angular/core";
-import { FileUploadItem } from "../../../type/dashboard-file.interface";
+import { Component, EventEmitter, Host, Input, Optional, Output } from 
"@angular/core";
+import { firstValueFrom } from "rxjs";
 import { NgxFileDropEntry } from "ngx-file-drop";
+import { NzModalRef, NzModalService } from "ng-zorro-antd/modal";
+import { FileUploadItem } from "../../../type/dashboard-file.interface";
 import { DatasetFileNode } from 
"../../../../common/type/datasetVersionFileTree";
 import { NotificationService } from 
"../../../../common/service/notification/notification.service";
 import { AdminSettingsService } from 
"../../../service/admin/settings/admin-settings.service";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { DatasetService } from "../../../service/user/dataset/dataset.service";
+import { DatasetDetailComponent } from 
"../user-dataset/user-dataset-explorer/dataset-detail.component";
+import { formatSize } from "../../../../common/util/size-formatter.util";
+import {
+  ConflictingFileModalContentComponent,
+  ConflictingFileModalData,
+} from 
"./conflicting-file-modal-content/conflicting-file-modal-content.component";
 
 @UntilDestroy()
 @Component({
@@ -32,23 +41,23 @@ import { UntilDestroy, untilDestroyed } from 
"@ngneat/until-destroy";
   styleUrls: ["./files-uploader.component.scss"],
 })
 export class FilesUploaderComponent {
-  @Input()
-  showUploadAlert: boolean = false;
+  @Input() showUploadAlert: boolean = false;
 
-  @Output()
-  uploadedFiles = new EventEmitter<FileUploadItem[]>();
+  @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
 
   newUploadFileTreeNodes: DatasetFileNode[] = [];
 
   fileUploadingFinished: boolean = false;
-  // four types: "success", "info", "warning" and "error"
   fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
   fileUploadBannerMessage: string = "";
   singleFileUploadMaxSizeMiB: number = 20;
 
   constructor(
     private notificationService: NotificationService,
-    private adminSettingsService: AdminSettingsService
+    private adminSettingsService: AdminSettingsService,
+    private datasetService: DatasetService,
+    @Optional() @Host() private parent: DatasetDetailComponent,
+    private modal: NzModalService
   ) {
     this.adminSettingsService
       .getSetting("single_file_upload_max_size_mib")
@@ -56,42 +65,163 @@ export class FilesUploaderComponent {
       .subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
   }
 
-  hideBanner() {
+  private markForceRestart(item: FileUploadItem): void {
+    // uploader should call backend init with type=forceRestart when this is 
set
+    item.restart = true;
+  }
+
+  private askResumeOrSkip(
+    item: FileUploadItem,
+    showForAll: boolean
+  ): Promise<"resume" | "resumeAll" | "restart" | "restartAll"> {
+    return new Promise(resolve => {
+      const fileName = item.name.split("/").pop() || item.name;
+      const sizeStr = formatSize(item.file.size);
+
+      const ref: NzModalRef = 
this.modal.create<ConflictingFileModalContentComponent, 
ConflictingFileModalData>({
+        nzTitle: "Conflicting File",
+        nzMaskClosable: false,
+        nzClosable: false,
+        nzContent: ConflictingFileModalContentComponent,
+        nzData: {
+          fileName,
+          path: item.name,
+          size: sizeStr,
+        },
+        nzFooter: [
+          ...(showForAll
+            ? [
+                {
+                  label: "Restart For All",
+                  onClick: () => {
+                    resolve("restartAll");
+                    ref.destroy();
+                  },
+                },
+                {
+                  label: "Resume For All",
+                  onClick: () => {
+                    resolve("resumeAll");
+                    ref.destroy();
+                  },
+                },
+              ]
+            : []),
+          {
+            label: "Restart",
+            onClick: () => {
+              resolve("restart");
+              ref.destroy();
+            },
+          },
+          {
+            label: "Resume",
+            type: "primary",
+            onClick: () => {
+              resolve("resume");
+              ref.destroy();
+            },
+          },
+        ],
+      });
+    });
+  }
+
+  private async resolveConflicts(items: FileUploadItem[], activePaths: 
string[]): Promise<FileUploadItem[]> {
+    const active = new Set(activePaths ?? []);
+    const isConflict = (p: string) => active.has(p) || 
active.has(encodeURIComponent(p));
+
+    const showForAll = items.length > 1;
+
+    let mode: "ask" | "resumeAll" | "restartAll" = "ask";
+    const out: FileUploadItem[] = [];
+
+    await items.reduce<Promise<void>>(async (chain, item) => {
+      await chain;
+
+      if (!isConflict(item.name)) {
+        out.push(item);
+        return;
+      }
+
+      if (mode === "resumeAll") {
+        out.push(item);
+        return;
+      }
+
+      if (mode === "restartAll") {
+        this.markForceRestart(item);
+        out.push(item);
+        return;
+      }
+
+      const choice = await this.askResumeOrSkip(item, showForAll);
+
+      if (choice === "resume") out.push(item);
+
+      if (choice === "resumeAll") {
+        mode = "resumeAll";
+        out.push(item);
+      }
+
+      if (choice === "restart") {
+        this.markForceRestart(item);
+        out.push(item);
+      }
+
+      if (choice === "restartAll") {
+        mode = "restartAll";
+        this.markForceRestart(item);
+        out.push(item);
+      }
+    }, Promise.resolve());
+
+    return out;
+  }
+
+  hideBanner(): void {
     this.fileUploadingFinished = false;
   }
 
-  showFileUploadBanner(bannerType: "error" | "success" | "info" | "warning", 
bannerMessage: string) {
+  showFileUploadBanner(bannerType: "error" | "success" | "info" | "warning", 
bannerMessage: string): void {
     this.fileUploadingFinished = true;
     this.fileUploadBannerType = bannerType;
     this.fileUploadBannerMessage = bannerMessage;
   }
 
-  public fileDropped(files: NgxFileDropEntry[]) {
-    // this promise create the FileEntry from each of the NgxFileDropEntry
-    // this filePromises is used to ensure the atomicity of file upload
+  private getOwnerAndName(): { ownerEmail: string; datasetName: string } {
+    return {
+      ownerEmail: this.parent?.ownerEmail ?? "",
+      datasetName: this.parent?.datasetName ?? "",
+    };
+  }
+
+  public fileDropped(files: NgxFileDropEntry[]): void {
     const filePromises = files.map(droppedFile => {
       return new Promise<FileUploadItem | null>((resolve, reject) => {
         if (droppedFile.fileEntry.isFile) {
           const fileEntry = droppedFile.fileEntry as FileSystemFileEntry;
-          fileEntry.file(file => {
-            // Check the file size here
-            if (file.size > this.singleFileUploadMaxSizeMiB * 1024 * 1024) {
-              // If the file is too large, reject the promise
-              this.notificationService.error(
-                `File ${file.name}'s size exceeds the maximum limit of 
${this.singleFileUploadMaxSizeMiB}MiB.`
-              );
-              reject(null);
-            } else {
-              // If the file size is within the limit, proceed
+          fileEntry.file(
+            file => {
+              if (file.size > this.singleFileUploadMaxSizeMiB * 1024 * 1024) {
+                this.notificationService.error(
+                  `File ${file.name}'s size exceeds the maximum limit of 
${this.singleFileUploadMaxSizeMiB}MiB.`
+                );
+                reject(null);
+                return;
+              }
+
               resolve({
-                file: file,
+                file,
                 name: droppedFile.relativePath,
                 description: "",
                 uploadProgress: 0,
                 isUploadingFlag: false,
+                restart: false,
               });
-            }
-          }, reject);
+            },
+            err => reject(err)
+          );
         } else {
           resolve(null);
         }
@@ -99,31 +229,33 @@ export class FilesUploaderComponent {
     });
 
     Promise.allSettled(filePromises)
-      .then(results => {
-        // once all FileUploadItems are created/some of them are rejected, 
enter this block
+      .then(async results => {
+        const { ownerEmail, datasetName } = this.getOwnerAndName();
+
+        const activePathsPromise =
+          ownerEmail && datasetName
+            ? 
firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail, 
datasetName)).catch(() => [])
+            : [];
+
+        const activePaths = await activePathsPromise;
         const successfulUploads = results
-          .filter((result): result is PromiseFulfilledResult<FileUploadItem | 
null> => result.status === "fulfilled")
-          .map(result => result.value)
+          .filter((r): r is PromiseFulfilledResult<FileUploadItem | null> => 
r.status === "fulfilled")
+          .map(r_1 => r_1.value)
           .filter((item): item is FileUploadItem => item !== null);
-
-        if (successfulUploads.length > 0) {
-          // successfulUploads.forEach(fileUploadItem => {
-          //   this.addFileToNewUploadsFileTree(fileUploadItem.name, 
fileUploadItem);
-          // });
-          const successMessage = `${successfulUploads.length} 
file${successfulUploads.length > 1 ? "s" : ""} selected successfully!`;
-          this.showFileUploadBanner("success", successMessage);
+        const filteredUploads = await this.resolveConflicts(successfulUploads, 
activePaths);
+        if (filteredUploads.length > 0) {
+          const msg = `${filteredUploads.length} file${filteredUploads.length 
> 1 ? "s" : ""} selected successfully!`;
+          this.showFileUploadBanner("success", msg);
         }
-
         const failedCount = results.length - successfulUploads.length;
         if (failedCount > 0) {
           const errorMessage = `${failedCount} file${failedCount > 1 ? "s" : 
""} failed to be selected.`;
           this.showFileUploadBanner("error", errorMessage);
         }
-
-        this.uploadedFiles.emit(successfulUploads);
+        this.uploadedFiles.emit(filteredUploads);
       })
       .catch(error => {
-        this.showFileUploadBanner("error", `Unexpected error: 
${error.message}`);
+        this.showFileUploadBanner("error", `Unexpected error: ${error?.message 
?? error}`);
       });
   }
 }
diff --git 
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
 
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
index 53a3c67391..f7eb492a9c 100644
--- 
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
+++ 
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
@@ -103,6 +103,8 @@ export class DatasetDetailComponent implements OnInit {
   versionName: string = "";
   isCreatingVersion: boolean = false;
 
+  public activeMultipartFilePaths: string[] = [];
+
   //  List of upload tasks – each task tracked by its filePath
   public uploadTasks: Array<
     MultipartUploadProgress & {
@@ -426,7 +428,8 @@ export class DatasetDetailComponent implements OnInit {
                 file.name,
                 file.file,
                 this.chunkSizeMiB * 1024 * 1024,
-                this.maxConcurrentChunks
+                this.maxConcurrentChunks,
+                file.restart
               )
               .pipe(untilDestroyed(this))
               .subscribe({
@@ -452,15 +455,24 @@ export class DatasetDetailComponent implements OnInit {
                     }
                   }
                 },
-                error: () => {
+                error: (res: unknown) => {
+                  const err = res as HttpErrorResponse;
+
+                  if (err?.status === HttpStatusCode.Conflict) {
+                    this.notificationService.error(
+                      "Upload blocked (409). Another upload is likely in 
progress for this file (another tab/browser), or the server is finalizing a 
previous upload. Please retry in a moment."
+                    );
+                  } else {
+                    this.notificationService.error("Upload failed. Please 
retry.");
+                  }
                   // Handle upload error
                   const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
 
                   if (taskIndex !== -1) {
                     this.uploadTasks[taskIndex] = {
                       ...this.uploadTasks[taskIndex],
-                      percentage: 100,
-                      status: "aborted",
+                      percentage: this.uploadTasks[taskIndex].percentage ?? 0, 
// was 100
+                      status: "failed",
                     };
                     this.scheduleHide(taskIndex);
                   }
@@ -591,7 +603,6 @@ export class DatasetDetailComponent implements OnInit {
           },
           error: (res: unknown) => {
             const err = res as HttpErrorResponse;
-
             // Already gone, treat as done
             if (err.status === 404) {
               done();
@@ -612,13 +623,17 @@ export class DatasetDetailComponent implements OnInit {
 
     abortWithRetry(0);
 
-    this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== 
task.filePath);
+    const idx = this.uploadTasks.findIndex(t => t.filePath === task.filePath);
+    if (idx !== -1) {
+      this.uploadTasks[idx] = { ...this.uploadTasks[idx], status: "aborted" };
+      this.scheduleHide(idx);
+    }
   }
 
-  getUploadStatus(status: "initializing" | "uploading" | "finished" | 
"aborted"): "active" | "exception" | "success" {
+  getUploadStatus(status: MultipartUploadProgress["status"]): "active" | 
"exception" | "success" {
     return status === "uploading" || status === "initializing"
       ? "active"
-      : status === "aborted"
+      : status === "aborted" || status === "failed"
         ? "exception"
         : "success";
   }
diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts 
b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
index 16c580be27..386c269da0 100644
--- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
+++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
@@ -51,7 +51,7 @@ export const DATASET_GET_OWNERS_URL = DATASET_BASE_URL + 
"/user-dataset-owners";
 export interface MultipartUploadProgress {
   filePath: string;
   percentage: number;
-  status: "initializing" | "uploading" | "finished" | "aborted";
+  status: "initializing" | "uploading" | "finished" | "aborted" | "failed";
   uploadSpeed?: number; // bytes per second
   estimatedTimeRemaining?: number; // seconds
   totalTime?: number; // total seconds taken
@@ -154,7 +154,8 @@ export class DatasetService {
     filePath: string,
     file: File,
     partSize: number,
-    concurrencyLimit: number
+    concurrencyLimit: number,
+    restart: boolean
   ): Observable<MultipartUploadProgress> {
     const partCount = Math.ceil(file.size / partSize);
 
@@ -162,6 +163,7 @@ export class DatasetService {
       // Track upload progress (bytes) for each part independently
       const partProgress = new Map<number, number>();
 
+      let baselineUploaded = 0;
       // Progress tracking state
       let startTime: number | null = null;
       const speedSamples: number[] = [];
@@ -193,7 +195,8 @@ export class DatasetService {
         }
         lastUpdateTime = now;
 
-        const currentSpeed = elapsed > 0 ? totalUploaded / elapsed : 0;
+        const sessionUploaded = Math.max(0, totalUploaded - baselineUploaded);
+        const currentSpeed = elapsed > 0 ? sessionUploaded / elapsed : 0;
         speedSamples.push(currentSpeed);
         if (speedSamples.length > 5) {
           speedSamples.shift();
@@ -232,48 +235,42 @@ export class DatasetService {
         .set("datasetName", datasetName)
         .set("filePath", encodeURIComponent(filePath))
         .set("fileSizeBytes", file.size.toString())
-        .set("partSizeBytes", partSize.toString());
+        .set("partSizeBytes", partSize.toString())
+        .set("restart", restart);
 
-      const init$ = this.http.post<{}>(
+      const init$ = this.http.post<{ missingParts: number[]; 
completedPartsCount: number }>(
         `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
         {},
         { params: initParams }
       );
 
-      const initWithAbortRetry$ = init$.pipe(
-        catchError((res: unknown) => {
-          const err = res as HttpErrorResponse;
-          if (err.status !== 409) {
-            return throwError(() => err);
-          }
-
-          // Init failed because a session already exists. Abort it and retry 
init once.
-          return this.finalizeMultipartUpload(ownerEmail, datasetName, 
filePath, true).pipe(
-            // best-effort abort; if abort itself fails, let the re-init decide
-            catchError(() => EMPTY),
-            switchMap(() => init$)
-          );
-        })
-      );
-
-      const subscription = initWithAbortRetry$
+      const subscription = init$
         .pipe(
           switchMap(initResp => {
-            // Notify UI that upload is starting
+            const missingParts = (initResp?.missingParts ?? []).slice();
+            const completedPartsCount = initResp?.completedPartsCount ?? 0;
+
+            const missingBytes = missingParts.reduce((sum, partNumber) => {
+              const start = (partNumber - 1) * partSize;
+              const end = Math.min(start + partSize, file.size);
+              return sum + (end - start);
+            }, 0);
+
+            baselineUploaded = file.size - missingBytes;
+            const baselinePct = partCount > 0 ? 
Math.round((completedPartsCount / partCount) * 100) : 0;
+
             observer.next({
               filePath,
-              percentage: 0,
+              percentage: baselinePct,
               status: "initializing",
               uploadSpeed: 0,
               estimatedTimeRemaining: 0,
               totalTime: 0,
             });
-
             // 2. Upload each part to /multipart-upload/part using 
XMLHttpRequest
-            return from(Array.from({ length: partCount }, (_, i) => i)).pipe(
-              mergeMap(index => {
-                const partNumber = index + 1;
-                const start = index * partSize;
+            return from(missingParts).pipe(
+              mergeMap(partNumber => {
+                const start = (partNumber - 1) * partSize;
                 const end = Math.min(start + partSize, file.size);
                 const chunk = file.slice(start, end);
 
@@ -284,7 +281,7 @@ export class DatasetService {
                     if (event.lengthComputable) {
                       partProgress.set(partNumber, event.loaded);
 
-                      let totalUploaded = 0;
+                      let totalUploaded = baselineUploaded; // CHANGED
                       partProgress.forEach(bytes => {
                         totalUploaded += bytes;
                       });
@@ -306,7 +303,7 @@ export class DatasetService {
                       // Mark part as fully uploaded
                       partProgress.set(partNumber, chunk.size);
 
-                      let totalUploaded = 0;
+                      let totalUploaded = baselineUploaded;
                       partProgress.forEach(bytes => {
                         totalUploaded += bytes;
                       });
@@ -385,7 +382,7 @@ export class DatasetService {
               }),
               catchError((error: unknown) => {
                 // On error, compute best-effort percentage from bytes we've 
seen
-                let totalUploaded = 0;
+                let totalUploaded = baselineUploaded;
                 partProgress.forEach(bytes => {
                   totalUploaded += bytes;
                 });
@@ -394,29 +391,13 @@ export class DatasetService {
                 observer.next({
                   filePath,
                   percentage,
-                  status: "aborted",
+                  status: "failed",
                   uploadSpeed: 0,
                   estimatedTimeRemaining: 0,
                   totalTime: getTotalTime(),
                 });
 
-                // Abort on backend
-                const abortParams = new HttpParams()
-                  .set("type", "abort")
-                  .set("ownerEmail", ownerEmail)
-                  .set("datasetName", datasetName)
-                  .set("filePath", encodeURIComponent(filePath));
-
-                return this.http
-                  .post(
-                    
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
-                    {},
-                    { params: abortParams }
-                  )
-                  .pipe(
-                    switchMap(() => throwError(() => error)),
-                    catchError(() => throwError(() => error))
-                  );
+                return throwError(() => error);
               })
             );
           })
@@ -429,6 +410,16 @@ export class DatasetService {
     });
   }
 
+  public listMultipartUploads(ownerEmail: string, datasetName: string): 
Observable<string[]> {
+    const params = new HttpParams().set("type", "list").set("ownerEmail", 
ownerEmail).set("datasetName", datasetName);
+
+    return this.http
+      .post<{
+        filePaths: string[];
+      
}>(`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, {}, 
{ params })
+      .pipe(map(res => res?.filePaths ?? []));
+  }
+
   public finalizeMultipartUpload(
     ownerEmail: string,
     datasetName: string,
diff --git a/frontend/src/app/dashboard/type/dashboard-file.interface.ts 
b/frontend/src/app/dashboard/type/dashboard-file.interface.ts
index 3dd5925e34..fa394904c2 100644
--- a/frontend/src/app/dashboard/type/dashboard-file.interface.ts
+++ b/frontend/src/app/dashboard/type/dashboard-file.interface.ts
@@ -50,6 +50,7 @@ export interface FileUploadItem {
   description: string;
   uploadProgress: number;
   isUploadingFlag: boolean;
+  restart: boolean;
 }
 
 /**
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index f4728279e3..a2de095688 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -289,6 +289,7 @@ CREATE TABLE IF NOT EXISTS dataset_upload_session
     num_parts_requested INT          NOT NULL,
     file_size_bytes     BIGINT       NOT NULL,
     part_size_bytes     BIGINT       NOT NULL,
+    created_at          TIMESTAMPTZ  NOT NULL DEFAULT now(),
 
     PRIMARY KEY (uid, did, file_path),
 
diff --git a/sql/updates/20.sql b/sql/updates/20.sql
new file mode 100644
index 0000000000..058abb1d48
--- /dev/null
+++ b/sql/updates/20.sql
@@ -0,0 +1,37 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+SET search_path TO texera_db, public;
+
+BEGIN;
+
+-- Step 1: Add the column (no default yet)
+ALTER TABLE dataset_upload_session
+    ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ;
+
+-- Step 2: Add the default for future inserts
+ALTER TABLE dataset_upload_session
+    ALTER COLUMN created_at SET DEFAULT now();
+
+ALTER TABLE dataset_upload_session
+    ALTER COLUMN created_at SET NOT NULL;
+
+COMMIT;

Reply via email to