This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 018fe8add [CELEBORN-2292] Fix ArithmeticException when
PUSH_DATA_HAND_SHAKE fails before any data written
018fe8add is described below
commit 018fe8add3407c532c35edd38bd045b25176a447
Author: luogen.lg <[email protected]>
AuthorDate: Wed Apr 1 15:46:14 2026 +0800
[CELEBORN-2292] Fix ArithmeticException when PUSH_DATA_HAND_SHAKE fails
before any data written
### What changes were proposed in this pull request?
Handle the case where numSubpartitions is zero in
MapPartitionDataReader.open(). When the partition is empty, treat it as a
normal empty partition and notify consumers accordingly.
### Why are the changes needed?
When the first PUSH_DATA_HAND_SHAKE request fails (e.g., timeout), client
triggers revive with reason HARD_SPLIT. Manager adds the failed partition to
partition locations, but numSubpartitions remains uninitialized (zero). Reading
such partition causes ArithmeticException: / by zero.
Since this is caused by client-side behavior, we handle it on worker side
first for cross-version compatibility. The issue that flink shuffle client
revives with fixed reason HARD_SPLIT can be addressed in later PRs.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually tested with a hacked version that throws exception on the first
handshake invocation. But the test code is too hacky to included into this PR.
Advices are welcomed on how to add a proper unit test for this scenario without
introducing too much complexity.
Closes #3637 from pltbkd/CELEBORN-2292.
Authored-by: luogen.lg <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../flink/client/FlinkShuffleClientImpl.java | 19 +++++++++++++--
.../worker/storage/MapPartitionDataReader.java | 28 +++++++++++++++++++++-
2 files changed, 44 insertions(+), 3 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 87a80006a..924a1ffbe 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -461,7 +461,15 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.toByteBuffer(),
pushDataTimeout);
} catch (IOException e) {
- // ioexeption revive
+ // IOException during handshake, need to revive. TODO revive with
handshake failure
+ // reason
+ logger.warn(
+ "PushDataHandShake failed for shuffle {} mapId {} attemptId {}
locationId {}. Triggering revive.",
+ shuffleId,
+ mapId,
+ attemptId,
+ location.getUniqueId(),
+ e);
return revive(shuffleId, mapId, attemptId, location);
}
if (pushDataHandShakeResponse.hasRemaining()
@@ -512,7 +520,14 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.toByteBuffer(),
pushDataTimeout);
} catch (IOException e) {
- // ioexeption revive
+ // IOException during regionStart, need to revive
+ logger.warn(
+ "RegionStart failed for shuffle {} mapId {} attemptId {}
locationId {}. Triggering revive.",
+ shuffleId,
+ mapId,
+ attemptId,
+ location.getUniqueId(),
+ e);
return revive(shuffleId, mapId, attemptId, location);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 8f8a2c353..910ce7951 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -138,8 +138,34 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
this.dataFileChannel = dataFileChannel;
this.dataFileChannelSize = dataFileChannel.size();
this.indexFileChannel = indexFileChannel;
+
+ int numSubpartitions = mapFileMeta.getNumSubpartitions();
+ // If numSubpartitions is 0, it means pushDataHandShake was never
successfully called.
+ // This can happen when the first handshake failed before any data was
written.
+ // In this case, check if data file is empty, and if so, treat this as
an empty partition.
+ if (numSubpartitions == 0) {
+ if (dataFileChannelSize == 0) {
+ logger.warn(
+ "Partition {} has numSubpartitions=0 and empty data file, this
indicates a failed "
+ + "handshake before any data was written. Treating as empty
partition.",
+ fileInfo.getFilePath());
+ isOpen = true;
+ // Mark as finished and notify consumer with backlog=1 so they can
complete normally
+ closeReader();
+ notifyBacklog(1);
+ return;
+ } else {
+ // Data file has content but numSubpartitions is 0, this is a
corrupted state
+ throw new FileCorruptedException(
+ "Partition "
+ + fileInfo.getFilePath()
+ + " has numSubpartitions=0 but data file size is "
+ + dataFileChannelSize);
+ }
+ }
+
// index is (offset,length)
- long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long)
INDEX_ENTRY_SIZE;
+ long indexRegionSize = numSubpartitions * (long) INDEX_ENTRY_SIZE;
this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize);
updateConsumingOffset();