This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e62d9244fb039fdbdaac987747c8ab4453c1450b
Author: yiguolei <[email protected]>
AuthorDate: Thu Mar 5 11:38:13 2026 +0800

    fix ut test bug
---
 .../partitioned_aggregation_sink_operator_test.cpp | 121 +++-
 ...artitioned_aggregation_source_operator_test.cpp | 196 +++++-
 .../partitioned_aggregation_test_helper.cpp        |   3 -
 .../partitioned_hash_join_probe_operator_test.cpp  | 728 +++++++++++++++------
 .../partitioned_hash_join_sink_operator_test.cpp   | 118 +++-
 .../operator/partitioned_hash_join_test_helper.cpp |  18 +-
 .../operator/partitioned_hash_join_test_helper.h   |   4 +-
 .../operator/spill_sort_sink_operator_test.cpp     | 134 +++-
 .../operator/spill_sort_source_operator_test.cpp   | 315 ++++++++-
 .../pipeline/operator/spill_sort_test_helper.cpp   |   5 +-
 10 files changed, 1349 insertions(+), 293 deletions(-)

diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
index 4f07db2fa98..c0989702841 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -231,7 +231,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpill) {
             local_state->_runtime_state->get_sink_local_state());
     ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
     ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
@@ -293,7 +293,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpillAndEmptyEOS) {
             local_state->_runtime_state->get_sink_local_state());
     ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
     ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
@@ -355,7 +355,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpillLargeData) {
             local_state->_runtime_state->get_sink_local_state());
     ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
     auto* spill_write_rows_counter = 
local_state->custom_profile()->get_counter("SpillWriteRows");
@@ -430,8 +430,121 @@ TEST_F(PartitionedAggregationSinkOperatorTest, 
SinkWithSpilError) {
     ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
 
     SpillableDebugPointHelper 
dp_helper("fault_inject::spill_file::spill_block");
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_FALSE(st.ok()) << "spilll status should be failed";
 }
 
+// Test multiple consecutive revoke_memory calls to verify repeated spilling 
works.
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithMultipleRevokes) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            local_state->_runtime_state->get_sink_local_state());
+
+    // Round 1: sink → revoke
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4, 
5});
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 5}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink round 1 failed: " << st.to_string();
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "revoke round 1 failed: " << st.to_string();
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    // Round 2: sink more → revoke again
+    auto block2 =
+            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({6, 7, 8, 9, 
10});
+    
block2.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {6, 7, 8, 9, 10}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block2, false);
+    ASSERT_TRUE(st.ok()) << "sink round 2 failed: " << st.to_string();
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "revoke round 2 failed: " << st.to_string();
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    ASSERT_TRUE(shared_state->_is_spilled);
+
+    // Verify spill counters accumulated across rounds
+    auto* spill_write_rows_counter = 
local_state->custom_profile()->get_counter("SpillWriteRows");
+    ASSERT_TRUE(spill_write_rows_counter != nullptr);
+    ASSERT_EQ(spill_write_rows_counter->value(), 10) << "SpillWriteRows should 
be 10 (5 per round)";
+
+    // Sink EOS
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink eos failed: " << st.to_string();
+}
+
+// Test revoke_memory when hash table is empty (no data sunk).
+TEST_F(PartitionedAggregationSinkOperatorTest, RevokeMemoryEmpty) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok());
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Revoke with no data is a valid operation
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "revoke_memory on empty should succeed: " << 
st.to_string();
+    ASSERT_TRUE(shared_state->_is_spilled);
+
+    auto* spill_write_rows_counter = 
local_state->custom_profile()->get_counter("SpillWriteRows");
+    ASSERT_TRUE(spill_write_rows_counter != nullptr);
+    ASSERT_EQ(spill_write_rows_counter->value(), 0);
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
index 7bf803efcb4..0a5f644faf1 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
@@ -138,8 +138,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockEmpty) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    local_state->_copy_shared_spill_profile = false;
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -228,8 +226,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    local_state->_copy_shared_spill_profile = false;
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -297,10 +293,10 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpill) {
     st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
-    ASSERT_TRUE(shared_state->is_spilled);
+    ASSERT_TRUE(shared_state->_is_spilled);
 
     st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
@@ -325,8 +321,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpill) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    local_state->_copy_shared_spill_profile = false;
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -399,10 +393,10 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpillError) {
     st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
-    ASSERT_TRUE(shared_state->is_spilled);
+    ASSERT_TRUE(shared_state->_is_spilled);
 
     st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
@@ -427,8 +421,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpillError) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    local_state->_copy_shared_spill_profile = false;
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -444,4 +436,182 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
GetBlockWithSpillError) {
 
     ASSERT_FALSE(st.ok());
 }
-} // namespace doris::pipeline
+
+// Test spill → recover cycle with large data to verify all rows come back.
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillLargeData) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.operator_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .shared_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok());
+
+    auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Create data with many distinct values to test larger spill sizes
+    const size_t count = 10000;
+    std::vector<int32_t> data(count);
+    std::iota(data.begin(), data.end(), 0);
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data);
+    block.insert(
+            
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(data));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+    ASSERT_TRUE(shared_state->_is_spilled);
+
+    // Sink EOS
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok());
+
+    // Now read back via source
+    LocalStateInfo info {.parent_profile = _helper.operator_profile.get(),
+                         .scan_ranges = {},
+                         .shared_state = shared_state.get(),
+                         .shared_state_map = {},
+                         .task_idx = 0};
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok());
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    block.clear();
+    bool eos = false;
+    size_t rows = 0;
+    while (!eos) {
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+        rows += block.rows();
+        block.clear_column_data();
+    }
+
+    ASSERT_TRUE(eos);
+    // With GROUP BY, all distinct keys should come back
+    ASSERT_EQ(rows, count) << "Expected " << count << " distinct rows";
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+}
+
+// Test multiple spill+recover cycles: sink → revoke → sink more → revoke → 
eos → source.
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithMultipleSpills) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.operator_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .shared_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok());
+
+    auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Round 1: sink {1,2,3,4} → revoke
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4});
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(shared_state->_is_spilled);
+
+    // Round 2: sink {5,6,7,8} → revoke
+    block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 6, 7, 8});
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {5, 6, 7, 8}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Sink EOS
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok());
+
+    // Read back via source
+    LocalStateInfo info {.parent_profile = _helper.operator_profile.get(),
+                         .scan_ranges = {},
+                         .shared_state = shared_state.get(),
+                         .shared_state_map = {},
+                         .task_idx = 0};
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok());
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    block.clear();
+    bool eos = false;
+    size_t rows = 0;
+    while (!eos) {
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+        rows += block.rows();
+        block.clear_column_data();
+    }
+
+    ASSERT_TRUE(eos);
+    ASSERT_EQ(rows, 8) << "Should recover all 8 distinct rows from 2 spill 
rounds";
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+}
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
index caa763aa7c2..52e0b64e2ae 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -201,7 +201,6 @@ PartitionedAggLocalState* 
PartitionedAggregationTestHelper::create_source_local_
     local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 0);
     local_state->init_spill_read_counters();
     local_state->init_spill_write_counters();
-    local_state->_copy_shared_spill_profile = false;
     local_state->_internal_runtime_profile = 
std::make_unique<RuntimeProfile>("inner_test");
 
     state->emplace_local_state(probe_operator->operator_id(), 
std::move(local_state_uptr));
@@ -224,8 +223,6 @@ PartitionedAggSinkLocalState* 
PartitionedAggregationTestHelper::create_sink_loca
             sink_operator->dests_id().front(), sink_operator->operator_id(),
             "PartitionedHashJoinTestDep");
 
-    shared_state->setup_shared_profile(local_state->custom_profile());
-
     state->emplace_sink_local_state(sink_operator->operator_id(), 
std::move(local_state_uptr));
     return local_state;
 }
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index 091187afdfe..8146957cefe 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -212,15 +212,15 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
spill_probe_blocks) {
     std::cout << "profile: " << local_state->custom_profile()->pretty_print() 
<< std::endl;
 
     for (int32_t i = 0; i != 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) {
-        if (!local_state->_probe_spilling_files[i]) {
+        if (!local_state->_probe_spilling_groups[i]) {
             continue;
         }
         ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(
-                local_state->_probe_spilling_files[i]);
-        local_state->_probe_spilling_files[i].reset();
+                local_state->_probe_spilling_groups[i]);
+        local_state->_probe_spilling_groups[i].reset();
     }
 
-    auto* write_rows_counter = 
local_state->custom_profile()->get_counter("SpillWriteRows");
+    auto* write_rows_counter = 
local_state->custom_profile()->get_counter("SpillProbeRows");
     ASSERT_EQ(write_rows_counter->value(),
               (PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT / 2) * 3 + 
3 * 1024 * 1024);
 }
@@ -232,38 +232,40 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDisk) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Create and register a spill stream for testing
-    const uint32_t test_partition = 0;
-    auto& spill_file = local_state->_probe_spilling_files[test_partition];
+    // Create and register a spill file for testing
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spill_file,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_probe", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    // Write some test data to spill stream
+    // Write some test data to spill file
     {
         vectorized::Block block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-        ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), 
block, false).ok());
-        ASSERT_TRUE(spill_file->spill_eof().ok());
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        ASSERT_TRUE(writer->close().ok());
     }
 
-    // Test recovery
-    bool has_data = false;
+    // Test recovery using JoinSpillPartitionInfo
+    JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0);
     ASSERT_TRUE(local_state
-                        
->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
-                                                         test_partition, 
has_data)
+                        
->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                              partition_info)
                         .ok());
-    ASSERT_TRUE(has_data);
 
     std::cout << "profile: " << local_state->custom_profile()->pretty_print() 
<< std::endl;
 
-    // Verify recovered data
-    auto& probe_blocks = local_state->_probe_blocks[test_partition];
+    // Verify recovered data (now in _queue_probe_blocks)
+    auto& probe_blocks = local_state->_queue_probe_blocks;
     ASSERT_FALSE(probe_blocks.empty());
     ASSERT_EQ(probe_blocks[0].rows(), 3);
 
@@ -272,11 +274,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDisk) {
             
local_state->custom_profile()->get_counter("SpillRecoveryProbeRows");
     ASSERT_EQ(recovery_rows_counter->value(), 3);
     auto* recovery_blocks_counter =
-            local_state->custom_profile()->get_counter("SpillReadBlockCount");
+            
local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks");
     ASSERT_EQ(recovery_blocks_counter->value(), 1);
 
     // Verify stream cleanup
-    ASSERT_EQ(local_state->_probe_spilling_files[test_partition], nullptr);
+    ASSERT_EQ(partition_info.probe_file, nullptr);
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskLargeData) {
@@ -286,19 +288,17 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskLargeData
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Create and register a spill stream for testing
-    const uint32_t test_partition = 0;
-    auto& spill_file = local_state->_probe_spilling_files[test_partition];
+    // Create and register a spill file for testing
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spill_file,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_probe", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    // Write some test data to spill stream
+    // Write some test data to spill file
     {
         // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10))
         std::vector<int32_t> large_data(8 * 1024 * 1024 + 10);
@@ -306,41 +306,51 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskLargeData
         vectorized::Block large_block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(large_data);
 
-        ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), 
large_block, false).ok());
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
large_block).ok());
 
         vectorized::Block block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-        ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), 
block, false).ok());
-        ASSERT_TRUE(spill_file->spill_eof().ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        ASSERT_TRUE(writer->close().ok());
     }
 
-    // Test recovery
-    bool has_data = true;
-    while (has_data) {
+    // Test recovery using JoinSpillPartitionInfo
+    JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0);
+    while (partition_info.probe_file) {
         ASSERT_TRUE(local_state
-                            
->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
-                                                             test_partition, 
has_data)
+                            
->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                                  
partition_info)
                             .ok());
     }
 
     std::cout << "profile: " << local_state->custom_profile()->pretty_print() 
<< std::endl;
 
-    // Verify recovered data
-    auto& probe_blocks = local_state->_probe_blocks[test_partition];
+    // Verify recovered data (now in _queue_probe_blocks)
+    auto& probe_blocks = local_state->_queue_probe_blocks;
     ASSERT_FALSE(probe_blocks.empty());
-    ASSERT_EQ(probe_blocks[0].rows(), 8 * 1024 * 1024 + 10);
-    ASSERT_EQ(probe_blocks[1].rows(), 3);
+
+    // Count total recovered rows
+    int64_t total_rows = 0;
+    for (const auto& block : probe_blocks) {
+        total_rows += block.rows();
+    }
+    ASSERT_EQ(total_rows, 8 * 1024 * 1024 + 10 + 3);
 
     // Verify counters
     auto* recovery_rows_counter =
             
local_state->custom_profile()->get_counter("SpillRecoveryProbeRows");
     ASSERT_EQ(recovery_rows_counter->value(), 3 + 8 * 1024 * 1024 + 10);
     auto* recovery_blocks_counter =
-            local_state->custom_profile()->get_counter("SpillReadBlockCount");
+            
local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks");
     ASSERT_EQ(recovery_blocks_counter->value(), 2);
 
     // Verify stream cleanup
-    ASSERT_EQ(local_state->_probe_spilling_files[test_partition], nullptr);
+    ASSERT_EQ(partition_info.probe_file, nullptr);
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskEmpty) {
@@ -350,31 +360,35 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskEmpty) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Test multiple cases
-    const uint32_t test_partition = 0;
-
-    auto& spilled_stream = local_state->_probe_spilling_files[test_partition];
+    // Create an empty spill file
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilled_stream,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_probe", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
-    ASSERT_TRUE(spilled_stream->spill_eof().ok());
+    // Write nothing, just close the writer
+    {
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
 
-    bool has_data = false;
+    JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0);
     ASSERT_TRUE(local_state
-                        
->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
-                                                         test_partition, 
has_data)
+                        
->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                              partition_info)
                         .ok());
-    ASSERT_TRUE(has_data);
 
-    ASSERT_TRUE(local_state->_probe_blocks[test_partition].empty())
-            << "probe blocks not empty: " << 
local_state->_probe_blocks[test_partition].size();
+    ASSERT_TRUE(local_state->_queue_probe_blocks.empty())
+            << "probe blocks not empty: " << 
local_state->_queue_probe_blocks.size();
 
-    ASSERT_TRUE(spilled_stream == nullptr);
+    ASSERT_TRUE(partition_info.probe_file == nullptr);
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) {
@@ -384,34 +398,36 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskError) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Test multiple cases
-    const uint32_t test_partition = 0;
-
-    auto& spilling_file = local_state->_probe_spilling_files[test_partition];
+    // Create a spill file and write data
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilling_file,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_probe", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    // Write some test data to spill stream
+    // Write some test data to spill file
     {
         vectorized::Block block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-        ASSERT_TRUE(spilling_file->spill_block(_helper.runtime_state.get(), 
block, false).ok());
-        ASSERT_TRUE(spilling_file->spill_eof().ok());
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        ASSERT_TRUE(writer->close().ok());
     }
 
     SpillableDebugPointHelper 
dp_helper("fault_inject::spill_file::read_next_block");
-    bool has_data = false;
-    auto status = 
local_state->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
-                                                              test_partition, 
has_data);
+    JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0);
+    auto status = 
local_state->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                                   
partition_info);
 
-    ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spilling_file);
-    spilling_file.reset();
+    ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spill_file);
+    spill_file.reset();
 
     ASSERT_FALSE(status.ok());
     ASSERT_TRUE(status.to_string().find("fault_inject spill_file 
read_next_block") !=
@@ -428,33 +444,35 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDisk) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Create and register spill stream with test data
-    const uint32_t test_partition = 0;
-    auto& spilled_stream = 
local_state->_shared_state->_spilled_files[test_partition];
+    // Create and register spill file with test data
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilled_stream,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_build", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
     // Write test data
     {
         vectorized::Block block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-        ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), 
block, false).ok());
-        ASSERT_TRUE(spilled_stream->spill_eof().ok());
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        ASSERT_TRUE(writer->close().ok());
     }
 
-    // Test recovery
-    bool has_data = false;
+    // Test recovery using JoinSpillPartitionInfo
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
     ASSERT_TRUE(local_state
-                        
->recover_build_blocks_from_disk(_helper.runtime_state.get(),
-                                                         test_partition, 
has_data)
+                        
->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                              partition_info)
                         .ok());
-    ASSERT_TRUE(has_data);
 
     // Verify recovered data
     ASSERT_TRUE(local_state->_recovered_build_block != nullptr);
@@ -465,11 +483,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDisk) {
             
local_state->custom_profile()->get_counter("SpillRecoveryBuildRows");
     ASSERT_EQ(recovery_rows_counter->value(), 3);
     auto* recovery_blocks_counter =
-            local_state->custom_profile()->get_counter("SpillReadBlockCount");
+            
local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks");
     ASSERT_EQ(recovery_blocks_counter->value(), 1);
 
     // Verify stream cleanup
-    ASSERT_EQ(local_state->_shared_state->_spilled_files[test_partition], 
nullptr);
+    ASSERT_EQ(partition_info.build_file, nullptr);
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, need_more_input_data) {
@@ -554,11 +572,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
get_reserve_mem_size) {
     local_state->_shared_state->_is_spilled = true;
     local_state->_child_eos = false;
 
-    local_state->_need_to_setup_internal_operators = false;
+    local_state->_need_to_setup_queue_partition = false;
     
ASSERT_EQ(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()),
               vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM);
 
-    local_state->_need_to_setup_internal_operators = true;
+    local_state->_need_to_setup_queue_partition = true;
     
ASSERT_GT(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()),
               vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM);
 
@@ -583,28 +601,33 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskEmpty) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Test empty stream
-    const uint32_t test_partition = 0;
-    auto& spilled_stream = 
local_state->_shared_state->_spilled_files[test_partition];
+    // Create an empty spill file
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilled_stream,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_build", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    ASSERT_TRUE(spilled_stream->spill_eof().ok());
+    // Write nothing, just close the writer
+    {
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
 
-    bool has_data = false;
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
     ASSERT_TRUE(local_state
-                        
->recover_build_blocks_from_disk(_helper.runtime_state.get(),
-                                                         test_partition, 
has_data)
+                        
->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                              partition_info)
                         .ok());
-    ASSERT_TRUE(has_data);
 
-    ASSERT_EQ(spilled_stream, nullptr);
+    ASSERT_EQ(partition_info.build_file, nullptr);
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 }
 
@@ -616,19 +639,17 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskLargeData
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Test empty stream
-    const uint32_t test_partition = 0;
-    auto& spilled_stream = 
local_state->_shared_state->_spilled_files[test_partition];
+    // Create spill file for large data test
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilled_stream,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_build", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    // Write some test data to spill stream
+    // Write some test data to spill file
     {
         // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10))
         std::vector<int32_t> large_data(8 * 1024 * 1024 + 10);
@@ -636,26 +657,28 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskLargeData
         vectorized::Block large_block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(large_data);
 
-        ASSERT_TRUE(
-                spilled_stream->spill_block(_helper.runtime_state.get(), 
large_block, false).ok());
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
large_block).ok());
 
         vectorized::Block block =
                 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-        ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), 
block, false).ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        ASSERT_TRUE(writer->close().ok());
     }
-    ASSERT_TRUE(spilled_stream->spill_eof().ok());
 
-    bool has_data = false;
-    do {
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    while (partition_info.build_file) {
         ASSERT_TRUE(local_state
-                            
->recover_build_blocks_from_disk(_helper.runtime_state.get(),
-                                                             test_partition, 
has_data)
+                            
->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                  
partition_info)
                             .ok());
 
         ASSERT_TRUE(local_state->_recovered_build_block);
-    } while (has_data);
-
-    ASSERT_EQ(spilled_stream, nullptr);
+    }
 
     // Verify recovered data
     ASSERT_EQ(local_state->_recovered_build_block->rows(), 8 * 1024 * 1024 + 
10 + 3);
@@ -665,12 +688,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskLargeData
             
local_state->custom_profile()->get_counter("SpillRecoveryBuildRows");
     ASSERT_EQ(recovery_rows_counter->value(), 8 * 1024 * 1024 + 10 + 3);
     auto* recovery_blocks_counter =
-            local_state->custom_profile()->get_counter("SpillReadBlockCount");
+            
local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks");
     ASSERT_EQ(recovery_blocks_counter->value(), 2);
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) {
-    // Similar setup code as above...
     // Similar setup as above...
     auto [probe_operator, sink_operator] = _helper.create_operators();
 
@@ -678,28 +700,34 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskError) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // Test empty stream
-    const uint32_t test_partition = 0;
-    auto& spilled_stream = 
local_state->_shared_state->_spilled_files[test_partition];
+    // Create an empty spill file
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
     ASSERT_TRUE(ExecEnv::GetInstance()
                         ->spill_file_mgr()
-                        ->create_spill_file(_helper.runtime_state.get(), 
spilled_stream,
-                                            
print_id(_helper.runtime_state->query_id()),
-                                            "hash_build", 
probe_operator->node_id(),
-                                            std::numeric_limits<size_t>::max(),
-                                            local_state->operator_profile())
+                        ->create_spill_file(relative_path, spill_file)
                         .ok());
 
-    ASSERT_TRUE(spilled_stream->spill_eof().ok());
+    // Write nothing, just close
+    {
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
 
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 
     // Test error handling with fault injection
     SpillableDebugPointHelper dp_helper(
             "fault_inject::partitioned_hash_join_probe::recover_build_blocks");
-    bool has_data = false;
-    auto status = 
local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(),
-                                                              test_partition, 
has_data);
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    auto status = 
local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                   
partition_info);
 
     ASSERT_FALSE(status.ok());
     ASSERT_TRUE(status.to_string().find("fault_inject 
partitioned_hash_join_probe "
@@ -907,7 +935,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
PushLargeBlock) {
     ASSERT_TRUE(found_probe_blocks);
 
     // Verify bytes counter
-    auto* probe_blocks_bytes = 
local_state->custom_profile()->get_counter("ProbeBloksBytesInMem");
+    auto* probe_blocks_bytes = 
local_state->custom_profile()->get_counter("ProbeBlocksBytesInMem");
     ASSERT_GT(probe_blocks_bytes->value(), 0);
 }
 
@@ -918,17 +946,20 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PullBasic) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    local_state->_need_to_setup_internal_operators = true;
-    local_state->_partition_cursor = 0;
+    // Pre-initialize the spill queue with one empty partition (no build/probe 
files)
+    local_state->_spill_queue_initialized = true;
+    local_state->_need_to_setup_queue_partition = true;
+    
local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr,
 nullptr, 0));
 
     vectorized::Block test_block;
     bool eos = false;
 
     auto st = probe_operator->pull(_helper.runtime_state.get(), &test_block, 
&eos);
     ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string();
-    ASSERT_FALSE(eos) << "First pull should not be eos";
 
-    ASSERT_EQ(1, local_state->_partition_cursor) << "Partition cursor should 
be 1";
+    // After processing setup, _need_to_setup_queue_partition should be false
+    ASSERT_FALSE(local_state->_need_to_setup_queue_partition)
+            << "Partition setup should have been completed";
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, PullMultiplePartitions) {
@@ -937,28 +968,26 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
PullMultiplePartitions) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
+    // Pre-initialize the spill queue with multiple empty partitions
+    local_state->_spill_queue_initialized = true;
     for (uint32_t i = 0; i < 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) {
-        auto& probe_blocks = local_state->_probe_blocks[i];
-        probe_blocks.emplace_back(
-                
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}));
+        local_state->_spill_partition_queue.emplace_back(
+                JoinSpillPartitionInfo(nullptr, nullptr, 0));
     }
 
     vectorized::Block output_block;
     bool eos = false;
 
-    for (uint32_t i = 0; i < 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) {
-        local_state->_partition_cursor = i;
-        local_state->_need_to_setup_internal_operators = true;
+    // Process all partitions through the queue
+    int processed = 0;
+    while (!eos && processed < 
(int)PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT + 1) {
+        local_state->_need_to_setup_queue_partition = true;
 
         auto st = probe_operator->pull(_helper.runtime_state.get(), 
&output_block, &eos);
-        ASSERT_TRUE(st.ok()) << "Pull failed for partition " << i;
-
-        if (i == PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT - 1) {
-            ASSERT_TRUE(eos) << "Last partition should be eos";
-        } else {
-            ASSERT_FALSE(eos) << "Non-last partition should not be eos";
-        }
+        ASSERT_TRUE(st.ok()) << "Pull failed for iteration " << processed;
+        processed++;
     }
+    ASSERT_TRUE(eos) << "Should reach eos after all partitions are processed";
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithDiskRecovery) {
@@ -969,46 +998,67 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
PullWithDiskRecovery) {
 
     local_state->_shared_state->_is_spilled = true;
 
-    const uint32_t test_partition = 0;
-    auto& spilled_stream = 
local_state->_shared_state->spilled_streams[test_partition];
-    auto& spilling_file = local_state->_probe_spilling_files[test_partition];
-
-    local_state->_need_to_setup_internal_operators = true;
+    // Create build and probe spill files
+    vectorized::SpillFileSPtr build_file;
+    auto build_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(build_path, build_file)
+                        .ok());
 
-    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-            _helper.runtime_state.get(), spilled_stream,
-            print_id(_helper.runtime_state->query_id()), "hash_probe_spilled",
-            probe_operator->node_id(), std::numeric_limits<int32_t>::max(),
-            std::numeric_limits<size_t>::max(), 
local_state->operator_profile());
+    vectorized::SpillFileSPtr probe_file;
+    auto probe_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(probe_path, probe_file)
+                        .ok());
 
-    ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string();
-    st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-            _helper.runtime_state.get(), spilling_file, 
print_id(_helper.runtime_state->query_id()),
-            "hash_probe", probe_operator->node_id(), 
std::numeric_limits<int32_t>::max(),
-            std::numeric_limits<size_t>::max(), 
local_state->operator_profile());
+    // Write test data to build file
+    {
+        vectorized::Block spill_block =
+                
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(build_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
spill_block).ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
 
-    ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string();
+    // Write test data to probe file
+    {
+        vectorized::Block spill_block =
+                
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({4, 5, 6});
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(probe_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
spill_block).ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
 
-    vectorized::Block spill_block =
-            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
-    st = spilled_stream->spill_block(_helper.runtime_state.get(), spill_block, 
true);
-    ASSERT_TRUE(st) << "Spill block failed: " << st.to_string();
-    st = spilling_file->spill_block(_helper.runtime_state.get(), spill_block, 
false);
-    ASSERT_TRUE(st) << "Spill block failed: " << st.to_string();
+    // Pre-initialize queue with one partition
+    local_state->_spill_queue_initialized = true;
+    local_state->_need_to_setup_queue_partition = true;
+    local_state->_spill_partition_queue.emplace_back(
+            JoinSpillPartitionInfo(build_file, probe_file, 0));
 
     vectorized::Block output_block;
     bool eos = false;
 
-    st = probe_operator->pull(_helper.runtime_state.get(), &output_block, 
&eos);
-    ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string();
-
-    st = probe_operator->pull(_helper.runtime_state.get(), &output_block, 
&eos);
-
+    // First pull should recover build data
+    auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, 
&eos);
     ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string();
     ASSERT_FALSE(eos) << "Should not be eos during disk recovery";
 
-    ASSERT_GT(local_state->_recovery_probe_rows->value(), 0)
-            << "Should have recovered some rows from disk";
+    ASSERT_GT(local_state->_recovery_build_rows->value(), 0)
+            << "Should have recovered some build rows from disk";
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithEmptyPartition) {
@@ -1017,20 +1067,22 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
PullWithEmptyPartition) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    // 设置空分区
-    local_state->_partition_cursor = 0;
-    local_state->_need_to_setup_internal_operators = true;
+    // Set up queue with an empty partition followed by another
+    local_state->_spill_queue_initialized = true;
+    local_state->_need_to_setup_queue_partition = true;
+    
local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr,
 nullptr, 0));
+    
local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr,
 nullptr, 0));
 
     vectorized::Block output_block;
     bool eos = false;
 
     auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, 
&eos);
     ASSERT_TRUE(st.ok()) << "Pull failed for empty partition";
-    ASSERT_FALSE(eos) << "Should not be eos for first empty partition";
+    ASSERT_FALSE(eos) << "Should not be eos since more partitions remain in 
queue";
 
-    // 验证分区游标已更新
-    ASSERT_EQ(1, local_state->_partition_cursor)
-            << "Partition cursor should move to next after empty partition";
+    // The first partition should have been popped from the queue
+    ASSERT_EQ(local_state->_spill_partition_queue.size(), 1u)
+            << "One partition should remain in queue after processing empty 
one";
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, Other) {
@@ -1041,10 +1093,276 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, Other) {
                                                         probe_operator.get(), 
shared_state);
 
     local_state->_shared_state->_is_spilled = true;
-    
ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get()));
 
-    auto st = probe_operator->_revoke_memory(_helper.runtime_state.get());
+    auto st = probe_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string();
 }
 
-} // namespace doris::pipeline
+// Test spill_probe_blocks with empty partitions (no data in any partition).
+TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks_empty) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0});
+    const auto& tnode = probe_operator->_tnode;
+    local_state->_partitioner = create_spill_partitioner(
+            _helper.runtime_state.get(), 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT,
+            {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc);
+
+    // No data in any partition
+    local_state->_shared_state->_is_spilled = true;
+    auto st = local_state->spill_probe_blocks(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "spill_probe_blocks with empty data failed: " << 
st.to_string();
+
+    // SpillProbeRows should be 0
+    auto* write_rows_counter = 
local_state->custom_profile()->get_counter("SpillProbeRows");
+    ASSERT_EQ(write_rows_counter->value(), 0);
+}
+
+// Test spill_probe_blocks with error injection.
+TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks_error) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0});
+    const auto& tnode = probe_operator->_tnode;
+    local_state->_partitioner = create_spill_partitioner(
+            _helper.runtime_state.get(), 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT,
+            {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc);
+
+    // Add data to partitions
+    for (int32_t i = 0; i != 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) {
+        vectorized::Block block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+                {1 * i, 2 * i, 3 * i});
+        local_state->_probe_blocks[i].emplace_back(std::move(block));
+    }
+
+    local_state->_shared_state->_is_spilled = true;
+
+    SpillableDebugPointHelper 
dp_helper("fault_inject::spill_file::spill_block");
+    auto st = local_state->spill_probe_blocks(_helper.runtime_state.get());
+    ASSERT_FALSE(st.ok()) << "spill_probe_blocks should fail with error 
injection";
+}
+
+// Test PushWithEOS followed by spill_probe_blocks for spilled partitions.
+TEST_F(PartitionedHashJoinProbeOperatorTest, PushEosAndSpillProbe) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0});
+    const auto& tnode = probe_operator->_tnode;
+    local_state->_partitioner = create_spill_partitioner(
+            _helper.runtime_state.get(), 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT,
+            {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc);
+
+    // Push data → EOS
+    vectorized::Block input_block =
+            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4, 
5});
+    auto st = probe_operator->push(_helper.runtime_state.get(), &input_block, 
false);
+    ASSERT_TRUE(st.ok()) << "Push failed: " << st.to_string();
+
+    input_block.clear();
+    st = probe_operator->push(_helper.runtime_state.get(), &input_block, true);
+    ASSERT_TRUE(st.ok()) << "Push EOS failed: " << st.to_string();
+
+    // Verify all data is in probe_blocks
+    int64_t total_rows = 0;
+    for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) {
+        for (const auto& block : local_state->_probe_blocks[i]) {
+            total_rows += block.rows();
+        }
+    }
+    ASSERT_EQ(total_rows, 5);
+
+    // Now spill the probe blocks
+    local_state->_shared_state->_is_spilled = true;
+    st = local_state->spill_probe_blocks(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "spill_probe_blocks failed: " << st.to_string();
+
+    auto* write_rows_counter = 
local_state->custom_profile()->get_counter("SpillProbeRows");
+    ASSERT_EQ(write_rows_counter->value(), 5);
+
+    // Cleanup
+    for (int32_t i = 0; i != 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) {
+        if (local_state->_probe_spilling_groups[i]) {
+            ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(
+                    local_state->_probe_spilling_groups[i]);
+            local_state->_probe_spilling_groups[i].reset();
+        }
+    }
+}
+
+// Test RecoverProbeBlocks with multiple blocks in one spill file.
+TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeMultipleBlocks) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    // Create spill file with 3 blocks
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, spill_file)
+                        .ok());
+
+    {
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+
+        for (int batch = 0; batch < 3; ++batch) {
+            vectorized::Block block =
+                    
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+                            {batch * 10 + 1, batch * 10 + 2, batch * 10 + 3});
+            ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        }
+        ASSERT_TRUE(writer->close().ok());
+    }
+
+    // Recover all blocks
+    JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0);
+    while (partition_info.probe_file) {
+        ASSERT_TRUE(local_state
+                            
->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                                  
partition_info)
+                            .ok());
+    }
+
+    // Verify all data recovered
+    int64_t total_rows = 0;
+    for (const auto& block : local_state->_queue_probe_blocks) {
+        total_rows += block.rows();
+    }
+    ASSERT_EQ(total_rows, 9);
+
+    auto* recovery_rows = 
local_state->custom_profile()->get_counter("SpillRecoveryProbeRows");
+    ASSERT_EQ(recovery_rows->value(), 9);
+    auto* recovery_blocks = 
local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks");
+    ASSERT_EQ(recovery_blocks->value(), 3);
+}
+
+// Test RecoverBuildBlocks with multiple blocks in one spill file.
+TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildMultipleBlocks) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    // Create spill file with 3 blocks
+    vectorized::SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, spill_file)
+                        .ok());
+
+    {
+        vectorized::SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+
+        for (int batch = 0; batch < 3; ++batch) {
+            vectorized::Block block =
+                    
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+                            {batch * 100 + 1, batch * 100 + 2});
+            ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), 
block).ok());
+        }
+        ASSERT_TRUE(writer->close().ok());
+    }
+
+    // Recover all blocks
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    while (partition_info.build_file) {
+        ASSERT_TRUE(local_state
+                            
->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                  
partition_info)
+                            .ok());
+    }
+
+    // Verify all data recovered
+    ASSERT_TRUE(local_state->_recovered_build_block != nullptr);
+    ASSERT_EQ(local_state->_recovered_build_block->rows(), 6);
+
+    auto* recovery_rows = 
local_state->custom_profile()->get_counter("SpillRecoveryBuildRows");
+    ASSERT_EQ(recovery_rows->value(), 6);
+    auto* recovery_blocks = 
local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks");
+    ASSERT_EQ(recovery_blocks->value(), 3);
+}
+
+// Test queue with all empty partitions reaches EOS.
+TEST_F(PartitionedHashJoinProbeOperatorTest, PullAllEmptyPartitions) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    // Initialize queue with 3 empty partitions
+    local_state->_spill_queue_initialized = true;
+    for (int i = 0; i < 3; ++i) {
+        local_state->_spill_partition_queue.emplace_back(
+                JoinSpillPartitionInfo(nullptr, nullptr, 0));
+    }
+
+    vectorized::Block output_block;
+    bool eos = false;
+    int iterations = 0;
+
+    while (!eos && iterations < 10) {
+        local_state->_need_to_setup_queue_partition = true;
+        auto st = probe_operator->pull(_helper.runtime_state.get(), 
&output_block, &eos);
+        ASSERT_TRUE(st.ok()) << "Pull failed at iteration " << iterations;
+        iterations++;
+    }
+
+    ASSERT_TRUE(eos) << "Should reach EOS after processing all empty 
partitions";
+    ASSERT_TRUE(local_state->_spill_partition_queue.empty())
+            << "Queue should be empty after processing all partitions";
+}
+
+// Test JoinSpillPartitionInfo validity.
+TEST_F(PartitionedHashJoinProbeOperatorTest, JoinSpillPartitionInfoValidation) 
{
+    // Default constructed should be invalid
+    JoinSpillPartitionInfo default_info;
+    ASSERT_FALSE(default_info.is_valid());
+
+    // Constructed with files should be valid
+    vectorized::SpillFileSPtr build_file;
+    auto relative_path =
+            fmt::format("{}/hash_build-test-{}", 
print_id(_helper.runtime_state->query_id()),
+                        ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, build_file)
+                        .ok());
+
+    JoinSpillPartitionInfo valid_info(build_file, nullptr, 1);
+    ASSERT_TRUE(valid_info.is_valid());
+    ASSERT_EQ(valid_info.level, 1);
+    ASSERT_TRUE(valid_info.build_file != nullptr);
+    ASSERT_TRUE(valid_info.probe_file == nullptr);
+
+    // Null files + initialized should still be valid
+    JoinSpillPartitionInfo null_files_info(nullptr, nullptr, 0);
+    ASSERT_TRUE(null_files_info.is_valid());
+}
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index a5adf33a4ac..d66764cf794 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -68,9 +68,8 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, Init) {
 
     tnode.row_tuples.push_back(desc_tbl.get_tuple_descs().front()->id());
 
-    PartitionedHashJoinSinkOperatorX operator_x(
-            _helper.obj_pool.get(), 0, 0, tnode, desc_tbl,
-            PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT);
+    PartitionedHashJoinSinkOperatorX operator_x(_helper.obj_pool.get(), 0, 0, 
tnode, desc_tbl);
+    operator_x._partition_count = 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT;
 
     auto child = std::make_shared<MockChildOperator>();
     child->_row_descriptor = RowDescriptor(_helper.runtime_state->desc_tbl(), 
{1});
@@ -163,9 +162,8 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitBuildExprs) 
{
     }
 
     DescriptorTbl desc_tbl;
-    PartitionedHashJoinSinkOperatorX operator_x(
-            _helper.obj_pool.get(), 0, 0, tnode, desc_tbl,
-            PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT);
+    PartitionedHashJoinSinkOperatorX operator_x(_helper.obj_pool.get(), 0, 0, 
tnode, desc_tbl);
+    operator_x._partition_count = 
PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT;
 
     ASSERT_TRUE(operator_x.init(tnode, _helper.runtime_state.get()));
     ASSERT_EQ(operator_x._build_exprs.size(), 4); // 1个初始 + 3个新增
@@ -290,7 +288,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, 
RevokeMemoryEmpty) {
 
     shared_state->_is_spilled = false;
     // Expect revoke memory to trigger spilling
-    auto status = sink_state->revoke_memory(_helper.runtime_state.get(), 
nullptr);
+    auto status = sink_state->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string();
     ASSERT_TRUE(sink_state->_shared_state->_is_spilled);
 }
@@ -322,12 +320,12 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) 
{
     DCHECK_GE(sink_operator->_child->row_desc().get_column_id(1), 0);
 
     for (uint32_t i = 0; i != sink_operator->_partition_count; ++i) {
-        auto& spilling_file = sink_state->_shared_state->_spilled_files[i];
-        auto st = (ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-                _helper.runtime_state.get(), spilling_file,
-                print_id(_helper.runtime_state->query_id()), 
fmt::format("hash_build_sink_{}", i),
-                sink_operator->node_id(), std::numeric_limits<size_t>::max(),
-                sink_state->operator_profile()));
+        auto& spilling_file = 
sink_state->_shared_state->_spilled_build_groups[i];
+        auto relative_path = fmt::format(
+                "{}/hash_build_sink_{}-{}-{}", 
print_id(_helper.runtime_state->query_id()), i,
+                sink_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path,
+                                                                              
spilling_file);
         ASSERT_TRUE(st.ok()) << "Register spill stream failed: " << 
st.to_string();
     }
 
@@ -352,7 +350,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) {
                                       "HashJoinBuildFinishDependency", true);
 
     // Expect revoke memory to trigger spilling
-    status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr);
+    status = sink_state->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string();
     ASSERT_TRUE(sink_state->_shared_state->_is_spilled);
 
@@ -369,10 +367,98 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) 
{
 
     sink_state->_shared_state->_partitioned_build_blocks[0] =
             vectorized::MutableBlock::create_unique(std::move(large_block));
-    status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr);
+    status = sink_state->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string();
 
     ASSERT_EQ(written_rows + 3 * 1024 * 1024, written_rows_counter->value());
 }
 
-} // namespace doris::pipeline
+// Test multiple revoke_memory cycles with different data sizes.
+TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryMultipleCycles) {
+    auto [_, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto sink_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                      sink_operator.get(), 
shared_state);
+
+    auto child = 
std::dynamic_pointer_cast<MockChildOperator>(sink_operator->child());
+    RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {1});
+    child->_row_descriptor = row_desc;
+
+    const auto& tnode = sink_operator->_tnode;
+    auto partitioner = 
std::make_unique<SpillPartitionerType>(sink_operator->_partition_count);
+    auto status = 
partitioner->init({tnode.hash_join_node.eq_join_conjuncts[0].right});
+    ASSERT_TRUE(status.ok());
+    status = partitioner->prepare(_helper.runtime_state.get(), 
sink_operator->_child->row_desc());
+    ASSERT_TRUE(status.ok());
+    sink_state->_partitioner = std::move(partitioner);
+    sink_state->_shared_state->_is_spilled = false;
+
+    // Setup spill files for all partitions
+    for (uint32_t i = 0; i != sink_operator->_partition_count; ++i) {
+        auto& spilling_file = 
sink_state->_shared_state->_spilled_build_groups[i];
+        auto relative_path = fmt::format(
+                "{}/hash_build_sink_{}-{}-{}", 
print_id(_helper.runtime_state->query_id()), i,
+                sink_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path,
+                                                                              
spilling_file);
+        ASSERT_TRUE(st.ok());
+    }
+
+    auto& inner_sink = sink_operator->_inner_sink_operator;
+    auto inner_sink_local_state = 
std::make_unique<MockHashJoinBuildSinkLocalState>(
+            inner_sink.get(), 
sink_state->_shared_state->_inner_runtime_state.get());
+    inner_sink_local_state->_hash_table_memory_usage =
+            sink_state->custom_profile()->add_counter("HashTableMemoryUsage", 
TUnit::BYTES);
+    inner_sink_local_state->_build_arena_memory_usage =
+            
sink_state->operator_profile()->add_counter("BuildArenaMemoryUsage", 
TUnit::BYTES);
+
+    sink_state->_finish_dependency =
+            Dependency::create_shared(sink_operator->operator_id(), 
sink_operator->node_id(),
+                                      "HashJoinBuildFinishDependency", true);
+
+    // Round 1: small data
+    auto block1 = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3});
+    inner_sink_local_state->_build_side_mutable_block = std::move(block1);
+    sink_state->_shared_state->_inner_runtime_state->emplace_sink_local_state(
+            0, std::move(inner_sink_local_state));
+
+    status = sink_state->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(status.ok()) << "Revoke memory round 1 failed: " << 
status.to_string();
+    ASSERT_TRUE(sink_state->_shared_state->_is_spilled);
+
+    auto* written_rows_counter = 
sink_state->custom_profile()->get_counter("SpillWriteRows");
+    ASSERT_TRUE(written_rows_counter != nullptr);
+    auto round1_rows = written_rows_counter->value();
+    ASSERT_GT(round1_rows, 0) << "Should have spilled some rows in round 1";
+
+    // Round 2: more data via partitioned blocks
+    auto block2 =
+            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({10, 20, 30, 
40, 50});
+    sink_state->_shared_state->_partitioned_build_blocks[1] =
+            vectorized::MutableBlock::create_unique(std::move(block2));
+
+    status = sink_state->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(status.ok()) << "Revoke memory round 2 failed: " << 
status.to_string();
+
+    ASSERT_GT(written_rows_counter->value(), round1_rows)
+            << "Rows counter should have increased after round 2";
+}
+
+// Test that revocable_mem_size returns 0 immediately after revoke.
+TEST_F(PartitionedHashJoinSinkOperatorTest, RevocableMemSizeAfterRevoke) {
+    auto [_, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto* sink_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                       sink_operator.get(), 
shared_state);
+
+    shared_state->_is_spilled = false;
+
+    auto status = sink_state->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(status.ok());
+    ASSERT_TRUE(sink_state->_shared_state->_is_spilled);
+
+    // After revoke with no data, revocable_mem_size should be 0
+    ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+}
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
index 21e57daf36e..34d02881ead 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
@@ -112,10 +112,12 @@ PartitionedHashJoinTestHelper::create_operators() {
 
     EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 2);
 
-    auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
-            obj_pool.get(), tnode, 0, desc_tbl, TEST_PARTITION_COUNT);
-    auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
-            obj_pool.get(), 0, 0, tnode, desc_tbl, TEST_PARTITION_COUNT);
+    auto probe_operator =
+            
std::make_shared<PartitionedHashJoinProbeOperatorX>(obj_pool.get(), tnode, 0, 
desc_tbl);
+    probe_operator->_partition_count = TEST_PARTITION_COUNT;
+    auto sink_operator = 
std::make_shared<PartitionedHashJoinSinkOperatorX>(obj_pool.get(), 0, 0,
+                                                                            
tnode, desc_tbl);
+    sink_operator->_partition_count = TEST_PARTITION_COUNT;
 
     auto child_operator = std::make_shared<MockChildOperator>();
     auto probe_side_source_operator = std::make_shared<MockChildOperator>();
@@ -176,13 +178,12 @@ PartitionedHashJoinProbeLocalState* 
PartitionedHashJoinTestHelper::create_probe_
     local_state->init_spill_read_counters();
     local_state->init_spill_write_counters();
     local_state->init_counters();
-    local_state->_copy_shared_spill_profile = false;
     local_state->_internal_runtime_profile = 
std::make_unique<RuntimeProfile>("inner_test");
 
     local_state->_partitioned_blocks.resize(probe_operator->_partition_count);
-    
local_state->_probe_spilling_files.resize(probe_operator->_partition_count);
+    
local_state->_probe_spilling_groups.resize(probe_operator->_partition_count);
 
-    shared_state->_spilled_files.resize(probe_operator->_partition_count);
+    
shared_state->_spilled_build_groups.resize(probe_operator->_partition_count);
     
shared_state->_partitioned_build_blocks.resize(probe_operator->_partition_count);
 
     shared_state->_inner_runtime_state = std::make_unique<MockRuntimeState>();
@@ -211,12 +212,11 @@ PartitionedHashJoinSinkLocalState* 
PartitionedHashJoinTestHelper::create_sink_lo
             sink_operator->dests_id().front(), sink_operator->operator_id(),
             "PartitionedHashJoinTestDep");
 
-    shared_state->_spilled_files.resize(sink_operator->_partition_count);
+    
shared_state->_spilled_build_groups.resize(sink_operator->_partition_count);
     
shared_state->_partitioned_build_blocks.resize(sink_operator->_partition_count);
 
     shared_state->_inner_runtime_state = std::make_unique<MockRuntimeState>();
     shared_state->_inner_shared_state = 
std::make_shared<MockHashJoinSharedState>();
-    shared_state->setup_shared_profile(local_state->custom_profile());
 
     state->emplace_sink_local_state(sink_operator->operator_id(), 
std::move(local_state_uptr));
     return local_state;
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h 
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
index f0c21ce1ea2..43c8cdfc915 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
@@ -45,12 +45,12 @@ public:
     MockPartitionedHashJoinSharedState() {
         _is_spilled = false;
         _inner_runtime_state = nullptr;
-        _spilled_files.clear();
+        _spilled_build_groups.clear();
         _partitioned_build_blocks.clear();
     }
 
     void init(size_t partition_count) {
-        _spilled_files.resize(partition_count);
+        _spilled_build_groups.resize(partition_count);
         _partitioned_build_blocks.resize(partition_count);
     }
 };
diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp 
b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
index 4b3519dfec6..df7a4d30001 100644
--- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
@@ -232,7 +232,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill) {
     st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
     auto input_block2 = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
@@ -302,7 +302,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill2) {
     st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false);
     ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
 
     ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
@@ -361,9 +361,137 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpillError) {
 
     SpillableDebugPointHelper 
dp_helper("fault_inject::spill_file::spill_block");
 
-    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
 
     ASSERT_FALSE(st.ok()) << "spilll status should be failed";
 }
 
+// Test multiple consecutive revoke_memory calls to verify repeated spilling 
works.
+TEST_F(SpillSortSinkOperatorTest, SinkMultipleRevokes) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state =
+            
std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state());
+    ASSERT_TRUE(shared_state != nullptr);
+
+    shared_state->create_source_dependency(sink_operator->operator_id(), 
sink_operator->node_id(),
+                                           "SpillSortSinkOperatorTest");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = {}};
+
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    // Perform 3 rounds of sink → revoke
+    for (int round = 0; round < 3; ++round) {
+        auto input_block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+                {1 + round, 2 + round, 3 + round, 4 + round, 5 + round});
+        input_block.insert(
+                
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
+                        {10, 9, 8, 7, 6}));
+
+        st = sink_operator->sink(_helper.runtime_state.get(), &input_block, 
false);
+        ASSERT_TRUE(st.ok()) << "sink failed on round " << round << ": " << 
st.to_string();
+
+        st = sink_operator->revoke_memory(_helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "revoke_memory failed on round " << round << 
": " << st.to_string();
+
+        
ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0)
+                << "revocable_mem_size should be 0 after revoke on round " << 
round;
+    }
+
+    // After 3 rounds of spilling, should have 3 spill files
+    ASSERT_EQ(shared_state->sorted_spill_groups.size(), 3)
+            << "Should have 3 spill groups after 3 revokes";
+
+    ASSERT_TRUE(shared_state->is_spilled) << "is_spilled should be true after 
revoke";
+}
+
+// Test sinking large data (>1M rows), then verify spill counters.
+TEST_F(SpillSortSinkOperatorTest, SinkLargeDataWithSpill) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state =
+            
std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state());
+    ASSERT_TRUE(shared_state != nullptr);
+
+    shared_state->create_source_dependency(sink_operator->operator_id(), 
sink_operator->node_id(),
+                                           "SpillSortSinkOperatorTest");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = {}};
+
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    // Create large data
+    const size_t count = 100000;
+    std::vector<int32_t> data(count);
+    std::iota(data.begin(), data.end(), 0);
+    std::vector<int64_t> data2(count);
+    std::iota(data2.begin(), data2.end(), 0);
+
+    auto input_block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data);
+    input_block.insert(
+            
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(data2));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+    ASSERT_TRUE(shared_state->is_spilled);
+
+    auto* spill_write_rows = 
sink_local_state->custom_profile()->get_counter("SpillWriteRows");
+    ASSERT_TRUE(spill_write_rows != nullptr);
+    ASSERT_EQ(spill_write_rows->value(), count)
+            << "SpillWriteRows should match the number of rows sunk";
+
+    // Sink empty EOS after spill
+    vectorized::Block empty_block;
+    st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, true);
+    ASSERT_TRUE(st.ok()) << "sink eos failed: " << st.to_string();
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp 
b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
index b00dd66a1ad..7286ff58de8 100644
--- a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp
@@ -118,8 +118,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlock) {
     auto* local_state = 
_helper.runtime_state->get_local_state(source_operator->operator_id());
     ASSERT_TRUE(local_state != nullptr);
 
-    shared_state->setup_shared_profile(_helper.operator_profile.get());
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -197,8 +195,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    shared_state->setup_shared_profile(_helper.operator_profile.get());
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -216,11 +212,10 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
     // Prepare stored streams
     for (size_t i = 0; i != 4; ++i) {
         vectorized::SpillFileSPtr spill_file;
-        st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-                _helper.runtime_state.get(), spill_file,
-                print_id(_helper.runtime_state->query_id()), 
sink_operator->get_name(),
-                sink_operator->node_id(), std::numeric_limits<int32_t>::max(),
-                _helper.operator_profile.get());
+        auto relative_path = fmt::format("{}/{}-{}-{}", 
print_id(_helper.runtime_state->query_id()),
+                                         sink_operator->get_name(), 
sink_operator->node_id(),
+                                         
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, 
spill_file);
         ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string();
 
         std::vector<int32_t> data;
@@ -236,10 +231,16 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
                 
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
                         data2));
 
-        st = spill_file->spill_block(_helper.runtime_state.get(), input_block, 
true);
-        ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string();
+        vectorized::SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_helper.runtime_state.get(), 
_helper.operator_profile.get(),
+                                       writer);
+        ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string();
+        st = writer->write_block(_helper.runtime_state.get(), input_block);
+        ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string();
+        st = writer->close();
+        ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string();
 
-        shared_state->sorted_spill_files.emplace_back(std::move(spill_file));
+        shared_state->sorted_spill_groups.emplace_back(std::move(spill_file));
     }
 
     std::unique_ptr<vectorized::MutableBlock> mutable_block;
@@ -262,7 +263,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) {
     }
 
     ASSERT_TRUE(eos);
-    ASSERT_TRUE(shared_state->sorted_spill_files.empty()) << 
"sorted_spill_files is not empty";
+    ASSERT_TRUE(shared_state->sorted_spill_groups.empty()) << 
"sorted_spill_groups is not empty";
     ASSERT_TRUE(mutable_block) << "mutable_block is null";
     ASSERT_EQ(mutable_block->rows(), 40);
     auto output_block = mutable_block->to_block();
@@ -388,8 +389,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    shared_state->setup_shared_profile(_helper.operator_profile.get());
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -407,11 +406,10 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
     // Prepare stored streams
     for (size_t i = 0; i != 4; ++i) {
         vectorized::SpillFileSPtr spill_file;
-        st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-                _helper.runtime_state.get(), spill_file,
-                print_id(_helper.runtime_state->query_id()), 
sink_operator->get_name(),
-                sink_operator->node_id(), std::numeric_limits<int32_t>::max(),
-                std::numeric_limits<int32_t>::max(), 
_helper.operator_profile.get());
+        auto relative_path = fmt::format("{}/{}-{}-{}", 
print_id(_helper.runtime_state->query_id()),
+                                         sink_operator->get_name(), 
sink_operator->node_id(),
+                                         
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, 
spill_file);
         ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string();
 
         std::vector<int32_t> data;
@@ -427,10 +425,16 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
                 
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
                         data2));
 
-        st = spill_file->spill_block(_helper.runtime_state.get(), input_block, 
true);
-        ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string();
+        vectorized::SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_helper.runtime_state.get(), 
_helper.operator_profile.get(),
+                                       writer);
+        ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string();
+        st = writer->write_block(_helper.runtime_state.get(), input_block);
+        ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string();
+        st = writer->close();
+        ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string();
 
-        shared_state->sorted_spill_files.emplace_back(std::move(spill_file));
+        shared_state->sorted_spill_groups.emplace_back(std::move(spill_file));
     }
 
     auto query_options = _helper.runtime_state->query_options();
@@ -457,7 +461,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) {
         }
     }
 
-    ASSERT_TRUE(shared_state->sorted_spill_files.empty()) << 
"sorted_spill_files is not empty";
+    ASSERT_TRUE(shared_state->sorted_spill_groups.empty()) << 
"sorted_spill_groups is not empty";
     ASSERT_TRUE(mutable_block) << "mutable_block is null";
     ASSERT_EQ(mutable_block->rows(), 40);
     auto output_block = mutable_block->to_block();
@@ -535,8 +539,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) 
{
             
_helper.runtime_state->get_local_state(source_operator->operator_id()));
     ASSERT_TRUE(local_state != nullptr);
 
-    shared_state->setup_shared_profile(_helper.operator_profile.get());
-
     st = local_state->open(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
 
@@ -554,11 +556,10 @@ TEST_F(SpillSortSourceOperatorTest, 
GetBlockWithSpillError) {
     // Prepare stored streams
     for (size_t i = 0; i != 4; ++i) {
         vectorized::SpillFileSPtr spill_file;
-        st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-                _helper.runtime_state.get(), spill_file,
-                print_id(_helper.runtime_state->query_id()), 
sink_operator->get_name(),
-                sink_operator->node_id(), std::numeric_limits<int32_t>::max(),
-                std::numeric_limits<int32_t>::max(), 
_helper.operator_profile.get());
+        auto relative_path = fmt::format("{}/{}-{}-{}", 
print_id(_helper.runtime_state->query_id()),
+                                         sink_operator->get_name(), 
sink_operator->node_id(),
+                                         
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, 
spill_file);
         ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string();
 
         std::vector<int32_t> data;
@@ -574,10 +575,16 @@ TEST_F(SpillSortSourceOperatorTest, 
GetBlockWithSpillError) {
                 
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
                         data2));
 
-        st = spill_file->spill_block(_helper.runtime_state.get(), input_block, 
true);
-        ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string();
+        vectorized::SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_helper.runtime_state.get(), 
_helper.operator_profile.get(),
+                                       writer);
+        ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string();
+        st = writer->write_block(_helper.runtime_state.get(), input_block);
+        ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string();
+        st = writer->close();
+        ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string();
 
-        shared_state->sorted_spill_files.emplace_back(std::move(spill_file));
+        shared_state->sorted_spill_groups.emplace_back(std::move(spill_file));
     }
 
     SpillableDebugPointHelper 
dp_helper("fault_inject::spill_file::read_next_block");
@@ -613,4 +620,244 @@ TEST_F(SpillSortSourceOperatorTest, 
GetBlockWithSpillError) {
     ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
 }
 
+// Test reading from a single spill file to verify minimal sorted output.
+TEST_F(SpillSortSourceOperatorTest, GetBlockWithSingleSpillFile) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+
+    auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state =
+            
std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state());
+    ASSERT_TRUE(shared_state != nullptr);
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    LocalSinkStateInfo sink_info {0, _helper.operator_profile.get(), -1, 
shared_state.get(), {},
+                                  {}};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = _helper.runtime_state->get_sink_local_state();
+    DCHECK(sink_local_state != nullptr);
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    LocalStateInfo info {.parent_profile = _helper.operator_profile.get(),
+                         .scan_ranges = {},
+                         .shared_state = shared_state.get(),
+                         .shared_state_map = {},
+                         .task_idx = 0};
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<SpillSortLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    shared_state->is_spilled = true;
+
+    auto* sorter = shared_state->in_mem_shared_state->sorter.get();
+    
sorter->_sort_description.resize(sorter->_vsort_exec_exprs.ordering_expr_ctxs().size());
+    for (int i = 0; i < sorter->_sort_description.size(); i++) {
+        sorter->_sort_description[i].column_number = i;
+        sorter->_sort_description[i].direction = 1;
+        sorter->_sort_description[i].nulls_direction = 1;
+    }
+
+    // Create a single spill file with descending data
+    {
+        vectorized::SpillFileSPtr spill_file;
+        auto relative_path = fmt::format("{}/{}-{}-{}", 
print_id(_helper.runtime_state->query_id()),
+                                         sink_operator->get_name(), 
sink_operator->node_id(),
+                                         
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+        st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, 
spill_file);
+        ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string();
+
+        auto input_block =
+                
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 4, 3, 2, 
1});
+        input_block.insert(
+                
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
+                        {50, 40, 30, 20, 10}));
+
+        vectorized::SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_helper.runtime_state.get(), 
_helper.operator_profile.get(),
+                                       writer);
+        ASSERT_TRUE(st.ok());
+        st = writer->write_block(_helper.runtime_state.get(), input_block);
+        ASSERT_TRUE(st.ok());
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+
+        shared_state->sorted_spill_groups.emplace_back(std::move(spill_file));
+    }
+
+    // Read all blocks from source
+    std::unique_ptr<vectorized::MutableBlock> mutable_block;
+    bool eos = false;
+    while (!eos) {
+        vectorized::Block block;
+        shared_state->spill_block_batch_row_count = 100;
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+        if (block.empty()) {
+            continue;
+        }
+        if (!mutable_block) {
+            mutable_block = 
vectorized::MutableBlock::create_unique(std::move(block));
+        } else {
+            st = mutable_block->merge(std::move(block));
+            ASSERT_TRUE(st.ok());
+        }
+    }
+
+    ASSERT_TRUE(eos);
+    ASSERT_TRUE(mutable_block) << "mutable_block is null";
+    ASSERT_EQ(mutable_block->rows(), 5);
+
+    auto output_block = mutable_block->to_block();
+    const auto& col1 = output_block.get_by_position(0).column;
+
+    // Verify sorted order (ascending)
+    for (size_t i = 1; i < col1->size(); ++i) {
+        ASSERT_GE(col1->get_int(i), col1->get_int(i - 1));
+    }
+
+    st = local_state->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+}
+
+// Test full pipeline: sink data → revoke → read back sorted from source.
+TEST_F(SpillSortSourceOperatorTest, EndToEndSinkAndSource) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    auto tnode = _helper.create_test_plan_node();
+    auto shared_state =
+            
std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state());
+    ASSERT_TRUE(shared_state != nullptr);
+
+    // Initialize and prepare both operators
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    shared_state->create_source_dependency(sink_operator->operator_id(), 
sink_operator->node_id(),
+                                           "SpillSortSinkOperatorTest");
+
+    // Setup sink local state
+    LocalSinkStateInfo sink_info {0, _helper.operator_profile.get(), -1, 
shared_state.get(), {},
+                                  {}};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok());
+
+    auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Setup source local state
+    LocalStateInfo source_info {.parent_profile = 
_helper.operator_profile.get(),
+                                .scan_ranges = {},
+                                .shared_state = shared_state.get(),
+                                .shared_state_map = {},
+                                .task_idx = 0};
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), 
source_info);
+    ASSERT_TRUE(st.ok());
+
+    auto* source_local_state = reinterpret_cast<SpillSortLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(source_local_state != nullptr);
+    st = source_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Sink batch 1: {5,3,1,4,2} → revoke
+    auto block1 =
+            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 3, 1, 4, 
2});
+    
block1.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
+            {50, 30, 10, 40, 20}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block1, false);
+    ASSERT_TRUE(st.ok());
+    st = sink_operator->revoke_memory(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+
+    // Sink batch 2: {10,8,6,9,7} → revoke
+    auto block2 =
+            
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({10, 8, 6, 9, 
7});
+    
block2.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(
+            {100, 80, 60, 90, 70}));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block2, false);
+    ASSERT_TRUE(st.ok());
+
+    // Sink EOS (triggers final revoke since is_spilled)
+    vectorized::Block empty_block;
+    st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, true);
+    ASSERT_TRUE(st.ok());
+
+    ASSERT_TRUE(shared_state->is_spilled);
+    ASSERT_GE(shared_state->sorted_spill_groups.size(), 2u);
+
+    // Read back from source
+    auto* sorter = shared_state->in_mem_shared_state->sorter.get();
+    
sorter->_sort_description.resize(sorter->_vsort_exec_exprs.ordering_expr_ctxs().size());
+    for (int i = 0; i < sorter->_sort_description.size(); i++) {
+        sorter->_sort_description[i].column_number = i;
+        sorter->_sort_description[i].direction = 1;
+        sorter->_sort_description[i].nulls_direction = 1;
+    }
+
+    std::unique_ptr<vectorized::MutableBlock> mutable_block;
+    bool eos = false;
+    while (!eos) {
+        vectorized::Block block;
+        shared_state->spill_block_batch_row_count = 100;
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+        if (block.empty()) continue;
+        if (!mutable_block) {
+            mutable_block = 
vectorized::MutableBlock::create_unique(std::move(block));
+        } else {
+            st = mutable_block->merge(std::move(block));
+            ASSERT_TRUE(st.ok());
+        }
+    }
+
+    ASSERT_TRUE(eos);
+    ASSERT_TRUE(mutable_block);
+    ASSERT_EQ(mutable_block->rows(), 10);
+
+    auto output_block = mutable_block->to_block();
+    const auto& col1 = output_block.get_by_position(0).column;
+
+    // Verify sorted order
+    for (size_t i = 1; i < col1->size(); ++i) {
+        ASSERT_GE(col1->get_int(i), col1->get_int(i - 1))
+                << "Not sorted at index " << i << ": " << col1->get_int(i - 1) 
<< " > "
+                << col1->get_int(i);
+    }
+
+    st = source_local_state->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok());
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp 
b/be/test/pipeline/operator/spill_sort_test_helper.cpp
index eb0b5341429..1a6eaafbab8 100644
--- a/be/test/pipeline/operator/spill_sort_test_helper.cpp
+++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp
@@ -130,7 +130,7 @@ SpillSortLocalState* 
SpillSortTestHelper::create_source_local_state(
     // Build a minimal local state manually.  Many tests prefer to use the
     // operators' own setup routines, but helper functions like this allow
     // individual units to be exercised without the full pipeline task.
-    auto local_state_uptr = 
std::make_unique<SpillSortLocalState>(source_operator, state);
+    auto local_state_uptr = std::make_unique<SpillSortLocalState>(state, 
source_operator);
     auto* local_state = local_state_uptr.get();
 
     shared_state = std::make_shared<MockSpillSortSharedState>();
@@ -144,7 +144,6 @@ SpillSortLocalState* 
SpillSortTestHelper::create_source_local_state(
     local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 0);
     local_state->init_spill_read_counters();
     local_state->init_spill_write_counters();
-    local_state->_copy_shared_spill_profile = false;
     local_state->_internal_runtime_profile = 
std::make_unique<RuntimeProfile>("inner_test");
 
     state->emplace_local_state(source_operator->operator_id(), 
std::move(local_state_uptr));
@@ -170,8 +169,6 @@ SpillSortSinkLocalState* 
SpillSortTestHelper::create_sink_local_state(
             sink_operator->dests_id().front(), sink_operator->operator_id(),
             "SpillSortSinkTestDep");
 
-    shared_state->setup_shared_profile(local_state->custom_profile());
-
     state->emplace_sink_local_state(sink_operator->operator_id(), 
std::move(local_state_uptr));
     return local_state;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to