This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 792283dadc [GLUTEN-10168][VL] Sort shuffle produce wrong partition
lengths in case of spill (#10208)
792283dadc is described below
commit 792283dadc94d2004d503db4a07d1a2a07a9229a
Author: Rong Ma <[email protected]>
AuthorDate: Fri Jul 18 10:49:59 2025 +0100
[GLUTEN-10168][VL] Sort shuffle produce wrong partition lengths in case of
spill (#10208)
---
cpp/core/shuffle/LocalPartitionWriter.cc | 6 +++++-
cpp/core/shuffle/rss/RssPartitionWriter.cc | 8 +++++---
cpp/velox/tests/VeloxShuffleWriterTest.cc | 23 +++++++++++++++++++++++
3 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 457db60a5c..77734e3287 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -95,7 +95,11 @@ class LocalPartitionWriter::LocalSpiller {
arrow::Status spill(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
payload) {
ScopedTimer timer(&spillTime_);
- curPid_ = partitionId;
+ if (curPid_ != partitionId) {
+ // Record the write position of the new partition.
+ ARROW_ASSIGN_OR_RAISE(writePos_, os_->Tell());
+ curPid_ = partitionId;
+ }
flushed_ = false;
auto* raw = compressedOs_ != nullptr ? compressedOs_.get() : os_.get();
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index d52bd5dbf3..996c137671 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -37,7 +37,8 @@ arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics*
metrics) {
spillTime_ -= compressTime_;
}
RETURN_NOT_OK(rssOs_->Flush());
- ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_],
rssOs_->Tell());
+ ARROW_ASSIGN_OR_RAISE(const auto evicted, rssOs_->Tell());
+ bytesEvicted_[lastEvictedPartitionId_] += evicted;
RETURN_NOT_OK(rssOs_->Close());
}
@@ -77,7 +78,8 @@ RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayl
RETURN_NOT_OK(compressedOs_->Flush());
}
RETURN_NOT_OK(rssOs_->Flush());
- ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_],
rssOs_->Tell());
+ ARROW_ASSIGN_OR_RAISE(const auto evicted, rssOs_->Tell());
+ bytesEvicted_[lastEvictedPartitionId_] += evicted;
RETURN_NOT_OK(rssOs_->Close());
}
@@ -94,7 +96,7 @@ RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayl
lastEvictedPartitionId_ = partitionId;
}
- rawPartitionLengths_[partitionId] = inMemoryPayload->rawSize();
+ rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (compressedOs_ != nullptr) {
RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
} else {
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 4d56e094b9..3f6cbc6b2a 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -743,6 +743,29 @@ TEST_P(RoundRobinPartitioningShuffleWriterTest,
sortMaxRows) {
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
}
+TEST_P(RoundRobinPartitioningShuffleWriterTest, sortSpill) {
+ if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) {
+ return;
+ }
+ auto shuffleWriter = createShuffleWriter(2);
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1024, &evicted));
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+ auto blockPid1 =
+ takeRows({inputVector1_, inputVector1_, inputVector1_}, {{0, 2, 4, 6,
8}, {0, 2, 4, 6, 8}, {0, 2, 4, 6, 8}});
+ auto blockPid2 =
+ takeRows({inputVector1_, inputVector1_, inputVector1_}, {{1, 3, 5, 7,
9}, {1, 3, 5, 7, 9}, {1, 3, 5, 7, 9}});
+
+ // Stop and verify.
+ shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
+}
+
INSTANTIATE_TEST_SUITE_P(
SinglePartitioningShuffleWriterGroup,
SinglePartitioningShuffleWriterTest,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]