This is an automated email from the ASF dual-hosted git repository.

aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 0aa4ad4ec5 feat: add centralized LakeFS error handling for multipart 
upload and dataset version operations (#4177)
0aa4ad4ec5 is described below

commit 0aa4ad4ec593adb97330f7a3102a0e7cadb4152d
Author: Xuan Gu <[email protected]>
AuthorDate: Tue Jan 27 13:16:44 2026 -0800

    feat: add centralized LakeFS error handling for multipart upload and 
dataset version operations (#4177)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    This PR adds centralized error handling for LakeFS API calls in
    `DatasetResource`, covering multipart upload and dataset version
    operations. This improves error visibility by returning meaningful HTTP
    status codes and error messages to the frontend. Currently, the wrapper
    is applied to these specific calls only, and has not yet been applied to
    all LakeFS calls.
    
    ### Changes
    - Converts LakeFS `ApiException` to HTTP exception (400, 401, 403, 404,
    409, 410, 412, 416, 420, 500)
    - Applied to `createDatasetVersion`, `getDatasetDiff`,
    `initMultipartUpload`, `finishMultipartUpload`, `abortMultipartUpload`
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    Related to #4176
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    - Passed existing automated tests
    - Added test case in `DatasetResourceSpec` for error message response
    and 500 internal error handling
    - Manual testing of multipart upload error flows
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    Generated-by: Claude (Anthropic)
---
 .../texera/service/resource/DatasetResource.scala  | 67 +++++++++++-------
 .../service/util/LakeFSExceptionHandler.scala      | 79 ++++++++++++++++++++++
 .../service/resource/DatasetResourceSpec.scala     | 52 ++++++++++++++
 3 files changed, 172 insertions(+), 26 deletions(-)

diff --git 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index 39cba2c84f..5a2658e065 100644
--- 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++ 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -72,6 +72,7 @@ import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
 import org.jooq.exception.DataAccessException
 import software.amazon.awssdk.services.s3.model.UploadPartResponse
 import org.apache.commons.io.FilenameUtils
+import 
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
 
 import java.sql.SQLException
 import scala.util.Try
@@ -360,7 +361,9 @@ class DatasetResource {
       val repositoryName = dataset.getRepositoryName
 
       // Check if there are any changes in LakeFS before creating a new version
-      val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName = 
repositoryName)
+      val diffs = withLakeFSErrorHandling {
+        LakeFSStorageClient.retrieveUncommittedObjects(repoName = 
repositoryName)
+      }
 
       if (diffs.isEmpty) {
         throw new WebApplicationException(
@@ -384,11 +387,13 @@ class DatasetResource {
       }
 
       // Create a commit in LakeFS
-      val commit = LakeFSStorageClient.createCommit(
-        repoName = repositoryName,
-        branch = "main",
-        commitMessage = s"Created dataset version: $newVersionName"
-      )
+      val commit = withLakeFSErrorHandling {
+        LakeFSStorageClient.createCommit(
+          repoName = repositoryName,
+          branch = "main",
+          commitMessage = s"Created dataset version: $newVersionName"
+        )
+      }
 
       if (commit == null || commit.getId == null) {
         throw new WebApplicationException(
@@ -412,7 +417,9 @@ class DatasetResource {
         .into(classOf[DatasetVersion])
 
       // Retrieve committed file structure
-      val fileNodes = 
LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId)
+      val fileNodes = withLakeFSErrorHandling {
+        LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, 
commit.getId)
+      }
 
       DashboardDatasetVersion(
         insertedVersion,
@@ -973,7 +980,9 @@ class DatasetResource {
 
       // Retrieve staged (uncommitted) changes from LakeFS
       val dataset = getDatasetByID(ctx, did)
-      val lakefsDiffs = 
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
+      val lakefsDiffs = withLakeFSErrorHandling {
+        
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
+      }
 
       // Convert LakeFS Diff objects to our custom Diff case class
       lakefsDiffs.map(d =>
@@ -1578,11 +1587,13 @@ class DatasetResource {
         )
       }
 
-      val presign = LakeFSStorageClient.initiatePresignedMultipartUploads(
-        repositoryName,
-        filePath,
-        numPartsValue
-      )
+      val presign = withLakeFSErrorHandling {
+        LakeFSStorageClient.initiatePresignedMultipartUploads(
+          repositoryName,
+          filePath,
+          numPartsValue
+        )
+      }
 
       val uploadIdStr = presign.getUploadId
       val physicalAddr = presign.getPhysicalAddress
@@ -1772,13 +1783,15 @@ class DatasetResource {
           )
           .toList
 
-      val objectStats = LakeFSStorageClient.completePresignedMultipartUploads(
-        dataset.getRepositoryName,
-        filePath,
-        uploadId,
-        partsList,
-        physicalAddr
-      )
+      val objectStats = withLakeFSErrorHandling {
+        LakeFSStorageClient.completePresignedMultipartUploads(
+          dataset.getRepositoryName,
+          filePath,
+          uploadId,
+          partsList,
+          physicalAddr
+        )
+      }
 
       // FINAL SERVER-SIDE SIZE CHECK (do not rely on init)
       val actualSizeBytes =
@@ -1884,12 +1897,14 @@ class DatasetResource {
         )
       }
 
-      LakeFSStorageClient.abortPresignedMultipartUploads(
-        dataset.getRepositoryName,
-        filePath,
-        session.getUploadId,
-        physicalAddr
-      )
+      withLakeFSErrorHandling {
+        LakeFSStorageClient.abortPresignedMultipartUploads(
+          dataset.getRepositoryName,
+          filePath,
+          session.getUploadId,
+          physicalAddr
+        )
+      }
 
       // Delete session; parts removed via ON DELETE CASCADE
       ctx
diff --git 
a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
 
b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
new file mode 100644
index 0000000000..c1997fb647
--- /dev/null
+++ 
b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import jakarta.ws.rs._
+import jakarta.ws.rs.core.{MediaType, Response}
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+
+object LakeFSExceptionHandler {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val fallbackMessages = Map(
+    400 -> "LakeFS rejected the request. Please verify the parameters 
(repository/branch/path) and try again.",
+    401 -> "Authentication with LakeFS failed.",
+    403 -> "Permission denied by LakeFS.",
+    404 -> "LakeFS resource not found. The repository/branch/object may not 
exist.",
+    409 -> "LakeFS reported a conflict. Another operation may be in progress.",
+    420 -> "Too many requests to LakeFS."
+  ).withDefaultValue(
+    "LakeFS request failed due to an unexpected server error."
+  )
+
+  /**
+    * Wraps a LakeFS call with centralized error handling.
+    */
+  def withLakeFSErrorHandling[T](call: => T): T = {
+    try {
+      call
+    } catch {
+      case e: io.lakefs.clients.sdk.ApiException => handleException(e)
+    }
+  }
+
+  /**
+    * Converts LakeFS ApiException to appropriate HTTP exception
+    */
+  private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing 
= {
+    val code = e.getCode
+    val rawBody = Option(e.getResponseBody).filter(_.nonEmpty)
+    val message = s"${fallbackMessages(code)}"
+
+    logger.warn(s"LakeFS error $code, ${e.getMessage}, body: 
${rawBody.getOrElse("N/A")}")
+
+    def errorResponse(status: Int): Response =
+      Response
+        .status(status)
+        .entity(Map("message" -> message).asJava)
+        .`type`(MediaType.APPLICATION_JSON)
+        .build()
+
+    throw (code match {
+      case 400                      => new 
BadRequestException(errorResponse(400))
+      case 401                      => new 
NotAuthorizedException(errorResponse(401))
+      case 403                      => new 
ForbiddenException(errorResponse(403))
+      case 404                      => new 
NotFoundException(errorResponse(404))
+      case c if c >= 400 && c < 500 => new 
WebApplicationException(errorResponse(c))
+      case _                        => new 
InternalServerErrorException(errorResponse(500))
+    })
+  }
+}
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index 2902626a70..0d37298e9e 100644
--- 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++ 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -1847,4 +1847,56 @@ class DatasetResourceSpec
     response.getStatus shouldEqual 307
     response.getHeaderString("Location") should not be null
   }
+
+  "LakeFS error handling" should "return 500 when ETag is invalid, with the 
message included in the error response body" in {
+    val filePath = uniqueFilePath("error-body")
+
+    initUpload(filePath, 2).getStatus shouldEqual 200
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+    getDSLContext
+      .update(DATASET_UPLOAD_SESSION_PART)
+      .set(DATASET_UPLOAD_SESSION_PART.ETAG, "BAD")
+      .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+      .execute()
+
+    val ex = intercept[WebApplicationException] {
+      finishUpload(filePath)
+    }
+
+    ex.getResponse.getStatus shouldEqual 500
+    Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should 
include(
+      "LakeFS request failed due to an unexpected server error."
+    )
+
+    abortUpload(filePath)
+  }
+
+  it should "return 400 when physicalAddress is invalid" in {
+    val filePath = uniqueFilePath("missing-physical-address")
+
+    initUpload(filePath, 2).getStatus shouldEqual 200
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    getDSLContext
+      .update(DATASET_UPLOAD_SESSION)
+      .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, "BAD")
+      .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId))
+      .execute()
+
+    val ex = intercept[WebApplicationException] { finishUpload(filePath) }
+    ex.getResponse.getStatus shouldEqual 400
+    Option(ex.getResponse.getEntity).map(_.toString).getOrElse("") should 
include(
+      "LakeFS rejected the request"
+    )
+
+    intercept[WebApplicationException] {
+      abortUpload(filePath)
+    }.getResponse.getStatus shouldEqual 400
+  }
 }

Reply via email to