This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 018fe8add [CELEBORN-2292] Fix ArithmeticException when 
PUSH_DATA_HAND_SHAKE fails before any data written
018fe8add is described below

commit 018fe8add3407c532c35edd38bd045b25176a447
Author: luogen.lg <[email protected]>
AuthorDate: Wed Apr 1 15:46:14 2026 +0800

    [CELEBORN-2292] Fix ArithmeticException when PUSH_DATA_HAND_SHAKE fails 
before any data written
    
    ### What changes were proposed in this pull request?
    
    Handle the case where numSubpartitions is zero in 
MapPartitionDataReader.open(). When the partition is empty, treat it as a 
normal empty partition and notify consumers accordingly.
    
    ### Why are the changes needed?
    
    When the first PUSH_DATA_HAND_SHAKE request fails (e.g., timeout), client 
triggers revive with reason HARD_SPLIT. Manager adds the failed partition to 
partition locations, but numSubpartitions remains uninitialized (zero). Reading 
such partition causes ArithmeticException: / by zero.
    Since this is caused by client-side behavior, we handle it on worker side 
first for cross-version compatibility. The issue that flink shuffle client 
revives with fixed reason HARD_SPLIT can be addressed in later PRs.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually tested with a hacked version that throws exception on the first 
handshake invocation. But the test code is too hacky to included into this PR. 
Advices are welcomed on how to add a proper unit test for this scenario without 
introducing too much complexity.
    
    Closes #3637 from pltbkd/CELEBORN-2292.
    
    Authored-by: luogen.lg <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../flink/client/FlinkShuffleClientImpl.java       | 19 +++++++++++++--
 .../worker/storage/MapPartitionDataReader.java     | 28 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 87a80006a..924a1ffbe 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -461,7 +461,15 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                         .toByteBuffer(),
                     pushDataTimeout);
           } catch (IOException e) {
-            // ioexeption revive
+            // IOException during handshake, need to revive. TODO revive with 
handshake failure
+            // reason
+            logger.warn(
+                "PushDataHandShake failed for shuffle {} mapId {} attemptId {} 
locationId {}. Triggering revive.",
+                shuffleId,
+                mapId,
+                attemptId,
+                location.getUniqueId(),
+                e);
             return revive(shuffleId, mapId, attemptId, location);
           }
           if (pushDataHandShakeResponse.hasRemaining()
@@ -512,7 +520,14 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                         .toByteBuffer(),
                     pushDataTimeout);
           } catch (IOException e) {
-            // ioexeption revive
+            // IOException during regionStart, need to revive
+            logger.warn(
+                "RegionStart failed for shuffle {} mapId {} attemptId {} 
locationId {}. Triggering revive.",
+                shuffleId,
+                mapId,
+                attemptId,
+                location.getUniqueId(),
+                e);
             return revive(shuffleId, mapId, attemptId, location);
           }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 8f8a2c353..910ce7951 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -138,8 +138,34 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       this.dataFileChannel = dataFileChannel;
       this.dataFileChannelSize = dataFileChannel.size();
       this.indexFileChannel = indexFileChannel;
+
+      int numSubpartitions = mapFileMeta.getNumSubpartitions();
+      // If numSubpartitions is 0, it means pushDataHandShake was never 
successfully called.
+      // This can happen when the first handshake failed before any data was 
written.
+      // In this case, check if data file is empty, and if so, treat this as 
an empty partition.
+      if (numSubpartitions == 0) {
+        if (dataFileChannelSize == 0) {
+          logger.warn(
+              "Partition {} has numSubpartitions=0 and empty data file, this 
indicates a failed "
+                  + "handshake before any data was written. Treating as empty 
partition.",
+              fileInfo.getFilePath());
+          isOpen = true;
+          // Mark as finished and notify consumer with backlog=1 so they can 
complete normally
+          closeReader();
+          notifyBacklog(1);
+          return;
+        } else {
+          // Data file has content but numSubpartitions is 0, this is a 
corrupted state
+          throw new FileCorruptedException(
+              "Partition "
+                  + fileInfo.getFilePath()
+                  + " has numSubpartitions=0 but data file size is "
+                  + dataFileChannelSize);
+        }
+      }
+
       // index is (offset,length)
-      long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) 
INDEX_ENTRY_SIZE;
+      long indexRegionSize = numSubpartitions * (long) INDEX_ENTRY_SIZE;
       this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize);
 
       updateConsumingOffset();

Reply via email to