This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit e62d9244fb039fdbdaac987747c8ab4453c1450b Author: yiguolei <[email protected]> AuthorDate: Thu Mar 5 11:38:13 2026 +0800 fix ut test bug --- .../partitioned_aggregation_sink_operator_test.cpp | 121 +++- ...artitioned_aggregation_source_operator_test.cpp | 196 +++++- .../partitioned_aggregation_test_helper.cpp | 3 - .../partitioned_hash_join_probe_operator_test.cpp | 728 +++++++++++++++------ .../partitioned_hash_join_sink_operator_test.cpp | 118 +++- .../operator/partitioned_hash_join_test_helper.cpp | 18 +- .../operator/partitioned_hash_join_test_helper.h | 4 +- .../operator/spill_sort_sink_operator_test.cpp | 134 +++- .../operator/spill_sort_source_operator_test.cpp | 315 ++++++++- .../pipeline/operator/spill_sort_test_helper.cpp | 5 +- 10 files changed, 1349 insertions(+), 293 deletions(-) diff --git a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp index 4f07db2fa98..c0989702841 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp @@ -231,7 +231,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpill) { local_state->_runtime_state->get_sink_local_state()); ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); @@ -293,7 +293,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillAndEmptyEOS) { local_state->_runtime_state->get_sink_local_state()); ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); @@ -355,7 +355,7 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillLargeData) { local_state->_runtime_state->get_sink_local_state()); ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); auto* spill_write_rows_counter = local_state->custom_profile()->get_counter("SpillWriteRows"); @@ -430,8 +430,121 @@ TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpilError) { ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); SpillableDebugPointHelper dp_helper("fault_inject::spill_file::spill_block"); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_FALSE(st.ok()) << "spilll status should be failed"; } +// Test multiple consecutive revoke_memory calls to verify repeated spilling works. +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithMultipleRevokes) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + local_state->_runtime_state->get_sink_local_state()); + + // Round 1: sink → revoke + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4, 5}); + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 5})); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink round 1 failed: " << st.to_string(); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke round 1 failed: " << st.to_string(); + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + // Round 2: sink more → revoke again + auto block2 = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({6, 7, 8, 9, 10}); + block2.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {6, 7, 8, 9, 10})); + st = sink_operator->sink(_helper.runtime_state.get(), &block2, false); + ASSERT_TRUE(st.ok()) << "sink round 2 failed: " << st.to_string(); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke round 2 failed: " << st.to_string(); + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + ASSERT_TRUE(shared_state->_is_spilled); + + // Verify spill counters accumulated across rounds + auto* spill_write_rows_counter = local_state->custom_profile()->get_counter("SpillWriteRows"); + ASSERT_TRUE(spill_write_rows_counter != nullptr); + ASSERT_EQ(spill_write_rows_counter->value(), 10) << "SpillWriteRows should be 10 (5 per round)"; + + // Sink EOS + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink eos failed: " << st.to_string(); +} + +// Test revoke_memory when hash table is empty (no data sunk). +TEST_F(PartitionedAggregationSinkOperatorTest, RevokeMemoryEmpty) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Revoke with no data is a valid operation + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke_memory on empty should succeed: " << st.to_string(); + ASSERT_TRUE(shared_state->_is_spilled); + + auto* spill_write_rows_counter = local_state->custom_profile()->get_counter("SpillWriteRows"); + ASSERT_TRUE(spill_write_rows_counter != nullptr); + ASSERT_EQ(spill_write_rows_counter->value(), 0); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp index 7bf803efcb4..0a5f644faf1 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp @@ -138,8 +138,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - local_state->_copy_shared_spill_profile = false; - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -228,8 +226,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - local_state->_copy_shared_spill_profile = false; - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -297,10 +293,10 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) { st = sink_operator->sink(_helper.runtime_state.get(), &block, false); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); - ASSERT_TRUE(shared_state->is_spilled); + ASSERT_TRUE(shared_state->_is_spilled); st = sink_operator->sink(_helper.runtime_state.get(), &block, true); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); @@ -325,8 +321,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - local_state->_copy_shared_spill_profile = false; - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -399,10 +393,10 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { st = sink_operator->sink(_helper.runtime_state.get(), &block, false); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); - ASSERT_TRUE(shared_state->is_spilled); + ASSERT_TRUE(shared_state->_is_spilled); st = sink_operator->sink(_helper.runtime_state.get(), &block, true); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); @@ -427,8 +421,6 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - local_state->_copy_shared_spill_profile = false; - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -444,4 +436,182 @@ TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { ASSERT_FALSE(st.ok()); } -} // namespace doris::pipeline + +// Test spill → recover cycle with large data to verify all rows come back. +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillLargeData) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()); + + auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Create data with many distinct values to test larger spill sizes + const size_t count = 10000; + std::vector<int32_t> data(count); + std::iota(data.begin(), data.end(), 0); + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data); + block.insert( + vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(data)); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + ASSERT_TRUE(shared_state->_is_spilled); + + // Sink EOS + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()); + + // Now read back via source + LocalStateInfo info {.parent_profile = _helper.operator_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + block.clear(); + bool eos = false; + size_t rows = 0; + while (!eos) { + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + rows += block.rows(); + block.clear_column_data(); + } + + ASSERT_TRUE(eos); + // With GROUP BY, all distinct keys should come back + ASSERT_EQ(rows, count) << "Expected " << count << " distinct rows"; + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); +} + +// Test multiple spill+recover cycles: sink → revoke → sink more → revoke → eos → source. +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithMultipleSpills) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()); + + auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Round 1: sink {1,2,3,4} → revoke + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4}); + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4})); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(shared_state->_is_spilled); + + // Round 2: sink {5,6,7,8} → revoke + block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 6, 7, 8}); + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {5, 6, 7, 8})); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Sink EOS + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()); + + // Read back via source + LocalStateInfo info {.parent_profile = _helper.operator_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + block.clear(); + bool eos = false; + size_t rows = 0; + while (!eos) { + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + rows += block.rows(); + block.clear_column_data(); + } + + ASSERT_TRUE(eos); + ASSERT_EQ(rows, 8) << "Should recover all 8 distinct rows from 2 spill rounds"; + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); +} diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp index caa763aa7c2..52e0b64e2ae 100644 --- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp +++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp @@ -201,7 +201,6 @@ PartitionedAggLocalState* PartitionedAggregationTestHelper::create_source_local_ local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); local_state->init_spill_read_counters(); local_state->init_spill_write_counters(); - local_state->_copy_shared_spill_profile = false; local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test"); state->emplace_local_state(probe_operator->operator_id(), std::move(local_state_uptr)); @@ -224,8 +223,6 @@ PartitionedAggSinkLocalState* PartitionedAggregationTestHelper::create_sink_loca sink_operator->dests_id().front(), sink_operator->operator_id(), "PartitionedHashJoinTestDep"); - shared_state->setup_shared_profile(local_state->custom_profile()); - state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr)); return local_state; } diff --git a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp index 091187afdfe..8146957cefe 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp @@ -212,15 +212,15 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks) { std::cout << "profile: " << local_state->custom_profile()->pretty_print() << std::endl; for (int32_t i = 0; i != PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) { - if (!local_state->_probe_spilling_files[i]) { + if (!local_state->_probe_spilling_groups[i]) { continue; } ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file( - local_state->_probe_spilling_files[i]); - local_state->_probe_spilling_files[i].reset(); + local_state->_probe_spilling_groups[i]); + local_state->_probe_spilling_groups[i].reset(); } - auto* write_rows_counter = local_state->custom_profile()->get_counter("SpillWriteRows"); + auto* write_rows_counter = local_state->custom_profile()->get_counter("SpillProbeRows"); ASSERT_EQ(write_rows_counter->value(), (PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT / 2) * 3 + 3 * 1024 * 1024); } @@ -232,38 +232,40 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDisk) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Create and register a spill stream for testing - const uint32_t test_partition = 0; - auto& spill_file = local_state->_probe_spilling_files[test_partition]; + // Create and register a spill file for testing + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spill_file, - print_id(_helper.runtime_state->query_id()), - "hash_probe", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - // Write some test data to spill stream + // Write some test data to spill file { vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), block, false).ok()); - ASSERT_TRUE(spill_file->spill_eof().ok()); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + ASSERT_TRUE(writer->close().ok()); } - // Test recovery - bool has_data = false; + // Test recovery using JoinSpillPartitionInfo + JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0); ASSERT_TRUE(local_state - ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_probe_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); - ASSERT_TRUE(has_data); std::cout << "profile: " << local_state->custom_profile()->pretty_print() << std::endl; - // Verify recovered data - auto& probe_blocks = local_state->_probe_blocks[test_partition]; + // Verify recovered data (now in _queue_probe_blocks) + auto& probe_blocks = local_state->_queue_probe_blocks; ASSERT_FALSE(probe_blocks.empty()); ASSERT_EQ(probe_blocks[0].rows(), 3); @@ -272,11 +274,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDisk) { local_state->custom_profile()->get_counter("SpillRecoveryProbeRows"); ASSERT_EQ(recovery_rows_counter->value(), 3); auto* recovery_blocks_counter = - local_state->custom_profile()->get_counter("SpillReadBlockCount"); + local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks"); ASSERT_EQ(recovery_blocks_counter->value(), 1); // Verify stream cleanup - ASSERT_EQ(local_state->_probe_spilling_files[test_partition], nullptr); + ASSERT_EQ(partition_info.probe_file, nullptr); } TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskLargeData) { @@ -286,19 +288,17 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskLargeData auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Create and register a spill stream for testing - const uint32_t test_partition = 0; - auto& spill_file = local_state->_probe_spilling_files[test_partition]; + // Create and register a spill file for testing + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spill_file, - print_id(_helper.runtime_state->query_id()), - "hash_probe", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - // Write some test data to spill stream + // Write some test data to spill file { // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10)) std::vector<int32_t> large_data(8 * 1024 * 1024 + 10); @@ -306,41 +306,51 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskLargeData vectorized::Block large_block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(large_data); - ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), large_block, false).ok()); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), large_block).ok()); vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - ASSERT_TRUE(spill_file->spill_block(_helper.runtime_state.get(), block, false).ok()); - ASSERT_TRUE(spill_file->spill_eof().ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + ASSERT_TRUE(writer->close().ok()); } - // Test recovery - bool has_data = true; - while (has_data) { + // Test recovery using JoinSpillPartitionInfo + JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0); + while (partition_info.probe_file) { ASSERT_TRUE(local_state - ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_probe_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); } std::cout << "profile: " << local_state->custom_profile()->pretty_print() << std::endl; - // Verify recovered data - auto& probe_blocks = local_state->_probe_blocks[test_partition]; + // Verify recovered data (now in _queue_probe_blocks) + auto& probe_blocks = local_state->_queue_probe_blocks; ASSERT_FALSE(probe_blocks.empty()); - ASSERT_EQ(probe_blocks[0].rows(), 8 * 1024 * 1024 + 10); - ASSERT_EQ(probe_blocks[1].rows(), 3); + + // Count total recovered rows + int64_t total_rows = 0; + for (const auto& block : probe_blocks) { + total_rows += block.rows(); + } + ASSERT_EQ(total_rows, 8 * 1024 * 1024 + 10 + 3); // Verify counters auto* recovery_rows_counter = local_state->custom_profile()->get_counter("SpillRecoveryProbeRows"); ASSERT_EQ(recovery_rows_counter->value(), 3 + 8 * 1024 * 1024 + 10); auto* recovery_blocks_counter = - local_state->custom_profile()->get_counter("SpillReadBlockCount"); + local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks"); ASSERT_EQ(recovery_blocks_counter->value(), 2); // Verify stream cleanup - ASSERT_EQ(local_state->_probe_spilling_files[test_partition], nullptr); + ASSERT_EQ(partition_info.probe_file, nullptr); } TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskEmpty) { @@ -350,31 +360,35 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskEmpty) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Test multiple cases - const uint32_t test_partition = 0; - - auto& spilled_stream = local_state->_probe_spilling_files[test_partition]; + // Create an empty spill file + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), - "hash_probe", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - ASSERT_TRUE(spilled_stream->spill_eof().ok()); + // Write nothing, just close the writer + { + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->close().ok()); + } - bool has_data = false; + JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0); ASSERT_TRUE(local_state - ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_probe_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); - ASSERT_TRUE(has_data); - ASSERT_TRUE(local_state->_probe_blocks[test_partition].empty()) - << "probe blocks not empty: " << local_state->_probe_blocks[test_partition].size(); + ASSERT_TRUE(local_state->_queue_probe_blocks.empty()) + << "probe blocks not empty: " << local_state->_queue_probe_blocks.size(); - ASSERT_TRUE(spilled_stream == nullptr); + ASSERT_TRUE(partition_info.probe_file == nullptr); } TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) { @@ -384,34 +398,36 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Test multiple cases - const uint32_t test_partition = 0; - - auto& spilling_file = local_state->_probe_spilling_files[test_partition]; + // Create a spill file and write data + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilling_file, - print_id(_helper.runtime_state->query_id()), - "hash_probe", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - // Write some test data to spill stream + // Write some test data to spill file { vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - ASSERT_TRUE(spilling_file->spill_block(_helper.runtime_state.get(), block, false).ok()); - ASSERT_TRUE(spilling_file->spill_eof().ok()); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + ASSERT_TRUE(writer->close().ok()); } SpillableDebugPointHelper dp_helper("fault_inject::spill_file::read_next_block"); - bool has_data = false; - auto status = local_state->recover_probe_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data); + JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0); + auto status = local_state->recover_probe_blocks_from_partition(_helper.runtime_state.get(), + partition_info); - ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spilling_file); - spilling_file.reset(); + ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spill_file); + spill_file.reset(); ASSERT_FALSE(status.ok()); ASSERT_TRUE(status.to_string().find("fault_inject spill_file read_next_block") != @@ -428,33 +444,35 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Create and register spill stream with test data - const uint32_t test_partition = 0; - auto& spilled_stream = local_state->_shared_state->_spilled_files[test_partition]; + // Create and register spill file with test data + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), - "hash_build", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); // Write test data { vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); - ASSERT_TRUE(spilled_stream->spill_eof().ok()); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + ASSERT_TRUE(writer->close().ok()); } - // Test recovery - bool has_data = false; + // Test recovery using JoinSpillPartitionInfo + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); ASSERT_TRUE(local_state - ->recover_build_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); - ASSERT_TRUE(has_data); // Verify recovered data ASSERT_TRUE(local_state->_recovered_build_block != nullptr); @@ -465,11 +483,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) { local_state->custom_profile()->get_counter("SpillRecoveryBuildRows"); ASSERT_EQ(recovery_rows_counter->value(), 3); auto* recovery_blocks_counter = - local_state->custom_profile()->get_counter("SpillReadBlockCount"); + local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks"); ASSERT_EQ(recovery_blocks_counter->value(), 1); // Verify stream cleanup - ASSERT_EQ(local_state->_shared_state->_spilled_files[test_partition], nullptr); + ASSERT_EQ(partition_info.build_file, nullptr); } TEST_F(PartitionedHashJoinProbeOperatorTest, need_more_input_data) { @@ -554,11 +572,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, get_reserve_mem_size) { local_state->_shared_state->_is_spilled = true; local_state->_child_eos = false; - local_state->_need_to_setup_internal_operators = false; + local_state->_need_to_setup_queue_partition = false; ASSERT_EQ(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM); - local_state->_need_to_setup_internal_operators = true; + local_state->_need_to_setup_queue_partition = true; ASSERT_GT(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM); @@ -583,28 +601,33 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskEmpty) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Test empty stream - const uint32_t test_partition = 0; - auto& spilled_stream = local_state->_shared_state->_spilled_files[test_partition]; + // Create an empty spill file + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), - "hash_build", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - ASSERT_TRUE(spilled_stream->spill_eof().ok()); + // Write nothing, just close the writer + { + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->close().ok()); + } - bool has_data = false; + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); ASSERT_TRUE(local_state - ->recover_build_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); - ASSERT_TRUE(has_data); - ASSERT_EQ(spilled_stream, nullptr); + ASSERT_EQ(partition_info.build_file, nullptr); ASSERT_TRUE(local_state->_recovered_build_block == nullptr); } @@ -616,19 +639,17 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Test empty stream - const uint32_t test_partition = 0; - auto& spilled_stream = local_state->_shared_state->_spilled_files[test_partition]; + // Create spill file for large data test + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), - "hash_build", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - // Write some test data to spill stream + // Write some test data to spill file { // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10)) std::vector<int32_t> large_data(8 * 1024 * 1024 + 10); @@ -636,26 +657,28 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData vectorized::Block large_block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(large_data); - ASSERT_TRUE( - spilled_stream->spill_block(_helper.runtime_state.get(), large_block, false).ok()); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), large_block).ok()); vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + ASSERT_TRUE(writer->close().ok()); } - ASSERT_TRUE(spilled_stream->spill_eof().ok()); - bool has_data = false; - do { + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); + while (partition_info.build_file) { ASSERT_TRUE(local_state - ->recover_build_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data) + ->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info) .ok()); ASSERT_TRUE(local_state->_recovered_build_block); - } while (has_data); - - ASSERT_EQ(spilled_stream, nullptr); + } // Verify recovered data ASSERT_EQ(local_state->_recovered_build_block->rows(), 8 * 1024 * 1024 + 10 + 3); @@ -665,12 +688,11 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData local_state->custom_profile()->get_counter("SpillRecoveryBuildRows"); ASSERT_EQ(recovery_rows_counter->value(), 8 * 1024 * 1024 + 10 + 3); auto* recovery_blocks_counter = - local_state->custom_profile()->get_counter("SpillReadBlockCount"); + local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks"); ASSERT_EQ(recovery_blocks_counter->value(), 2); } TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) { - // Similar setup code as above... // Similar setup as above... auto [probe_operator, sink_operator] = _helper.create_operators(); @@ -678,28 +700,34 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // Test empty stream - const uint32_t test_partition = 0; - auto& spilled_stream = local_state->_shared_state->_spilled_files[test_partition]; + // Create an empty spill file + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); ASSERT_TRUE(ExecEnv::GetInstance() ->spill_file_mgr() - ->create_spill_file(_helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), - "hash_build", probe_operator->node_id(), - std::numeric_limits<size_t>::max(), - local_state->operator_profile()) + ->create_spill_file(relative_path, spill_file) .ok()); - ASSERT_TRUE(spilled_stream->spill_eof().ok()); + // Write nothing, just close + { + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->close().ok()); + } ASSERT_TRUE(local_state->_recovered_build_block == nullptr); // Test error handling with fault injection SpillableDebugPointHelper dp_helper( "fault_inject::partitioned_hash_join_probe::recover_build_blocks"); - bool has_data = false; - auto status = local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(), - test_partition, has_data); + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); + auto status = local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info); ASSERT_FALSE(status.ok()); ASSERT_TRUE(status.to_string().find("fault_inject partitioned_hash_join_probe " @@ -907,7 +935,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PushLargeBlock) { ASSERT_TRUE(found_probe_blocks); // Verify bytes counter - auto* probe_blocks_bytes = local_state->custom_profile()->get_counter("ProbeBloksBytesInMem"); + auto* probe_blocks_bytes = local_state->custom_profile()->get_counter("ProbeBlocksBytesInMem"); ASSERT_GT(probe_blocks_bytes->value(), 0); } @@ -918,17 +946,20 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PullBasic) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - local_state->_need_to_setup_internal_operators = true; - local_state->_partition_cursor = 0; + // Pre-initialize the spill queue with one empty partition (no build/probe files) + local_state->_spill_queue_initialized = true; + local_state->_need_to_setup_queue_partition = true; + local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr, nullptr, 0)); vectorized::Block test_block; bool eos = false; auto st = probe_operator->pull(_helper.runtime_state.get(), &test_block, &eos); ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); - ASSERT_FALSE(eos) << "First pull should not be eos"; - ASSERT_EQ(1, local_state->_partition_cursor) << "Partition cursor should be 1"; + // After processing setup, _need_to_setup_queue_partition should be false + ASSERT_FALSE(local_state->_need_to_setup_queue_partition) + << "Partition setup should have been completed"; } TEST_F(PartitionedHashJoinProbeOperatorTest, PullMultiplePartitions) { @@ -937,28 +968,26 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PullMultiplePartitions) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); + // Pre-initialize the spill queue with multiple empty partitions + local_state->_spill_queue_initialized = true; for (uint32_t i = 0; i < PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) { - auto& probe_blocks = local_state->_probe_blocks[i]; - probe_blocks.emplace_back( - vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3})); + local_state->_spill_partition_queue.emplace_back( + JoinSpillPartitionInfo(nullptr, nullptr, 0)); } vectorized::Block output_block; bool eos = false; - for (uint32_t i = 0; i < PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) { - local_state->_partition_cursor = i; - local_state->_need_to_setup_internal_operators = true; + // Process all partitions through the queue + int processed = 0; + while (!eos && processed < (int)PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT + 1) { + local_state->_need_to_setup_queue_partition = true; auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); - ASSERT_TRUE(st.ok()) << "Pull failed for partition " << i; - - if (i == PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT - 1) { - ASSERT_TRUE(eos) << "Last partition should be eos"; - } else { - ASSERT_FALSE(eos) << "Non-last partition should not be eos"; - } + ASSERT_TRUE(st.ok()) << "Pull failed for iteration " << processed; + processed++; } + ASSERT_TRUE(eos) << "Should reach eos after all partitions are processed"; } TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithDiskRecovery) { @@ -969,46 +998,67 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithDiskRecovery) { local_state->_shared_state->_is_spilled = true; - const uint32_t test_partition = 0; - auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; - auto& spilling_file = local_state->_probe_spilling_files[test_partition]; - - local_state->_need_to_setup_internal_operators = true; + // Create build and probe spill files + vectorized::SpillFileSPtr build_file; + auto build_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(build_path, build_file) + .ok()); - auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spilled_stream, - print_id(_helper.runtime_state->query_id()), "hash_probe_spilled", - probe_operator->node_id(), std::numeric_limits<int32_t>::max(), - std::numeric_limits<size_t>::max(), local_state->operator_profile()); + vectorized::SpillFileSPtr probe_file; + auto probe_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(probe_path, probe_file) + .ok()); - ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string(); - st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spilling_file, print_id(_helper.runtime_state->query_id()), - "hash_probe", probe_operator->node_id(), std::numeric_limits<int32_t>::max(), - std::numeric_limits<size_t>::max(), local_state->operator_profile()); + // Write test data to build file + { + vectorized::Block spill_block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(build_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), spill_block).ok()); + ASSERT_TRUE(writer->close().ok()); + } - ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string(); + // Write test data to probe file + { + vectorized::Block spill_block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({4, 5, 6}); + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(probe_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), spill_block).ok()); + ASSERT_TRUE(writer->close().ok()); + } - vectorized::Block spill_block = - vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); - st = spilled_stream->spill_block(_helper.runtime_state.get(), spill_block, true); - ASSERT_TRUE(st) << "Spill block failed: " << st.to_string(); - st = spilling_file->spill_block(_helper.runtime_state.get(), spill_block, false); - ASSERT_TRUE(st) << "Spill block failed: " << st.to_string(); + // Pre-initialize queue with one partition + local_state->_spill_queue_initialized = true; + local_state->_need_to_setup_queue_partition = true; + local_state->_spill_partition_queue.emplace_back( + JoinSpillPartitionInfo(build_file, probe_file, 0)); vectorized::Block output_block; bool eos = false; - st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); - ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); - - st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); - + // First pull should recover build data + auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); ASSERT_FALSE(eos) << "Should not be eos during disk recovery"; - ASSERT_GT(local_state->_recovery_probe_rows->value(), 0) - << "Should have recovered some rows from disk"; + ASSERT_GT(local_state->_recovery_build_rows->value(), 0) + << "Should have recovered some build rows from disk"; } TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithEmptyPartition) { @@ -1017,20 +1067,22 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithEmptyPartition) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - // 设置空分区 - local_state->_partition_cursor = 0; - local_state->_need_to_setup_internal_operators = true; + // Set up queue with an empty partition followed by another + local_state->_spill_queue_initialized = true; + local_state->_need_to_setup_queue_partition = true; + local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr, nullptr, 0)); + local_state->_spill_partition_queue.emplace_back(JoinSpillPartitionInfo(nullptr, nullptr, 0)); vectorized::Block output_block; bool eos = false; auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); ASSERT_TRUE(st.ok()) << "Pull failed for empty partition"; - ASSERT_FALSE(eos) << "Should not be eos for first empty partition"; + ASSERT_FALSE(eos) << "Should not be eos since more partitions remain in queue"; - // 验证分区游标已更新 - ASSERT_EQ(1, local_state->_partition_cursor) - << "Partition cursor should move to next after empty partition"; + // The first partition should have been popped from the queue + ASSERT_EQ(local_state->_spill_partition_queue.size(), 1u) + << "One partition should remain in queue after processing empty one"; } TEST_F(PartitionedHashJoinProbeOperatorTest, Other) { @@ -1041,10 +1093,276 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, Other) { probe_operator.get(), shared_state); local_state->_shared_state->_is_spilled = true; - ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get())); - auto st = probe_operator->_revoke_memory(_helper.runtime_state.get()); + auto st = probe_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string(); } -} // namespace doris::pipeline +// Test spill_probe_blocks with empty partitions (no data in any partition). +TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks_empty) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // No data in any partition + local_state->_shared_state->_is_spilled = true; + auto st = local_state->spill_probe_blocks(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "spill_probe_blocks with empty data failed: " << st.to_string(); + + // SpillProbeRows should be 0 + auto* write_rows_counter = local_state->custom_profile()->get_counter("SpillProbeRows"); + ASSERT_EQ(write_rows_counter->value(), 0); +} + +// Test spill_probe_blocks with error injection. +TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks_error) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // Add data to partitions + for (int32_t i = 0; i != PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) { + vectorized::Block block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1 * i, 2 * i, 3 * i}); + local_state->_probe_blocks[i].emplace_back(std::move(block)); + } + + local_state->_shared_state->_is_spilled = true; + + SpillableDebugPointHelper dp_helper("fault_inject::spill_file::spill_block"); + auto st = local_state->spill_probe_blocks(_helper.runtime_state.get()); + ASSERT_FALSE(st.ok()) << "spill_probe_blocks should fail with error injection"; +} + +// Test PushWithEOS followed by spill_probe_blocks for spilled partitions. +TEST_F(PartitionedHashJoinProbeOperatorTest, PushEosAndSpillProbe) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // Push data → EOS + vectorized::Block input_block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3, 4, 5}); + auto st = probe_operator->push(_helper.runtime_state.get(), &input_block, false); + ASSERT_TRUE(st.ok()) << "Push failed: " << st.to_string(); + + input_block.clear(); + st = probe_operator->push(_helper.runtime_state.get(), &input_block, true); + ASSERT_TRUE(st.ok()) << "Push EOS failed: " << st.to_string(); + + // Verify all data is in probe_blocks + int64_t total_rows = 0; + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + for (const auto& block : local_state->_probe_blocks[i]) { + total_rows += block.rows(); + } + } + ASSERT_EQ(total_rows, 5); + + // Now spill the probe blocks + local_state->_shared_state->_is_spilled = true; + st = local_state->spill_probe_blocks(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "spill_probe_blocks failed: " << st.to_string(); + + auto* write_rows_counter = local_state->custom_profile()->get_counter("SpillProbeRows"); + ASSERT_EQ(write_rows_counter->value(), 5); + + // Cleanup + for (int32_t i = 0; i != PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) { + if (local_state->_probe_spilling_groups[i]) { + ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file( + local_state->_probe_spilling_groups[i]); + local_state->_probe_spilling_groups[i].reset(); + } + } +} + +// Test RecoverProbeBlocks with multiple blocks in one spill file. +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeMultipleBlocks) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create spill file with 3 blocks + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_probe-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(relative_path, spill_file) + .ok()); + + { + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + + for (int batch = 0; batch < 3; ++batch) { + vectorized::Block block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {batch * 10 + 1, batch * 10 + 2, batch * 10 + 3}); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + } + ASSERT_TRUE(writer->close().ok()); + } + + // Recover all blocks + JoinSpillPartitionInfo partition_info(nullptr, spill_file, 0); + while (partition_info.probe_file) { + ASSERT_TRUE(local_state + ->recover_probe_blocks_from_partition(_helper.runtime_state.get(), + partition_info) + .ok()); + } + + // Verify all data recovered + int64_t total_rows = 0; + for (const auto& block : local_state->_queue_probe_blocks) { + total_rows += block.rows(); + } + ASSERT_EQ(total_rows, 9); + + auto* recovery_rows = local_state->custom_profile()->get_counter("SpillRecoveryProbeRows"); + ASSERT_EQ(recovery_rows->value(), 9); + auto* recovery_blocks = local_state->custom_profile()->get_counter("SpillRecoveryProbeBlocks"); + ASSERT_EQ(recovery_blocks->value(), 3); +} + +// Test RecoverBuildBlocks with multiple blocks in one spill file. +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildMultipleBlocks) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create spill file with 3 blocks + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(relative_path, spill_file) + .ok()); + + { + vectorized::SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + + for (int batch = 0; batch < 3; ++batch) { + vectorized::Block block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {batch * 100 + 1, batch * 100 + 2}); + ASSERT_TRUE(writer->write_block(_helper.runtime_state.get(), block).ok()); + } + ASSERT_TRUE(writer->close().ok()); + } + + // Recover all blocks + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); + while (partition_info.build_file) { + ASSERT_TRUE(local_state + ->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info) + .ok()); + } + + // Verify all data recovered + ASSERT_TRUE(local_state->_recovered_build_block != nullptr); + ASSERT_EQ(local_state->_recovered_build_block->rows(), 6); + + auto* recovery_rows = local_state->custom_profile()->get_counter("SpillRecoveryBuildRows"); + ASSERT_EQ(recovery_rows->value(), 6); + auto* recovery_blocks = local_state->custom_profile()->get_counter("SpillRecoveryBuildBlocks"); + ASSERT_EQ(recovery_blocks->value(), 3); +} + +// Test queue with all empty partitions reaches EOS. +TEST_F(PartitionedHashJoinProbeOperatorTest, PullAllEmptyPartitions) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Initialize queue with 3 empty partitions + local_state->_spill_queue_initialized = true; + for (int i = 0; i < 3; ++i) { + local_state->_spill_partition_queue.emplace_back( + JoinSpillPartitionInfo(nullptr, nullptr, 0)); + } + + vectorized::Block output_block; + bool eos = false; + int iterations = 0; + + while (!eos && iterations < 10) { + local_state->_need_to_setup_queue_partition = true; + auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()) << "Pull failed at iteration " << iterations; + iterations++; + } + + ASSERT_TRUE(eos) << "Should reach EOS after processing all empty partitions"; + ASSERT_TRUE(local_state->_spill_partition_queue.empty()) + << "Queue should be empty after processing all partitions"; +} + +// Test JoinSpillPartitionInfo validity. +TEST_F(PartitionedHashJoinProbeOperatorTest, JoinSpillPartitionInfoValidation) { + // Default constructed should be invalid + JoinSpillPartitionInfo default_info; + ASSERT_FALSE(default_info.is_valid()); + + // Constructed with files should be valid + vectorized::SpillFileSPtr build_file; + auto relative_path = + fmt::format("{}/hash_build-test-{}", print_id(_helper.runtime_state->query_id()), + ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(relative_path, build_file) + .ok()); + + JoinSpillPartitionInfo valid_info(build_file, nullptr, 1); + ASSERT_TRUE(valid_info.is_valid()); + ASSERT_EQ(valid_info.level, 1); + ASSERT_TRUE(valid_info.build_file != nullptr); + ASSERT_TRUE(valid_info.probe_file == nullptr); + + // Null files + initialized should still be valid + JoinSpillPartitionInfo null_files_info(nullptr, nullptr, 0); + ASSERT_TRUE(null_files_info.is_valid()); +} diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp index a5adf33a4ac..d66764cf794 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -68,9 +68,8 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, Init) { tnode.row_tuples.push_back(desc_tbl.get_tuple_descs().front()->id()); - PartitionedHashJoinSinkOperatorX operator_x( - _helper.obj_pool.get(), 0, 0, tnode, desc_tbl, - PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + PartitionedHashJoinSinkOperatorX operator_x(_helper.obj_pool.get(), 0, 0, tnode, desc_tbl); + operator_x._partition_count = PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; auto child = std::make_shared<MockChildOperator>(); child->_row_descriptor = RowDescriptor(_helper.runtime_state->desc_tbl(), {1}); @@ -163,9 +162,8 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitBuildExprs) { } DescriptorTbl desc_tbl; - PartitionedHashJoinSinkOperatorX operator_x( - _helper.obj_pool.get(), 0, 0, tnode, desc_tbl, - PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + PartitionedHashJoinSinkOperatorX operator_x(_helper.obj_pool.get(), 0, 0, tnode, desc_tbl); + operator_x._partition_count = PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ASSERT_TRUE(operator_x.init(tnode, _helper.runtime_state.get())); ASSERT_EQ(operator_x._build_exprs.size(), 4); // 1个初始 + 3个新增 @@ -290,7 +288,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryEmpty) { shared_state->_is_spilled = false; // Expect revoke memory to trigger spilling - auto status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + auto status = sink_state->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); ASSERT_TRUE(sink_state->_shared_state->_is_spilled); } @@ -322,12 +320,12 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) { DCHECK_GE(sink_operator->_child->row_desc().get_column_id(1), 0); for (uint32_t i = 0; i != sink_operator->_partition_count; ++i) { - auto& spilling_file = sink_state->_shared_state->_spilled_files[i]; - auto st = (ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spilling_file, - print_id(_helper.runtime_state->query_id()), fmt::format("hash_build_sink_{}", i), - sink_operator->node_id(), std::numeric_limits<size_t>::max(), - sink_state->operator_profile())); + auto& spilling_file = sink_state->_shared_state->_spilled_build_groups[i]; + auto relative_path = fmt::format( + "{}/hash_build_sink_{}-{}-{}", print_id(_helper.runtime_state->query_id()), i, + sink_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, + spilling_file); ASSERT_TRUE(st.ok()) << "Register spill stream failed: " << st.to_string(); } @@ -352,7 +350,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) { "HashJoinBuildFinishDependency", true); // Expect revoke memory to trigger spilling - status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + status = sink_state->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); ASSERT_TRUE(sink_state->_shared_state->_is_spilled); @@ -369,10 +367,98 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) { sink_state->_shared_state->_partitioned_build_blocks[0] = vectorized::MutableBlock::create_unique(std::move(large_block)); - status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + status = sink_state->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); ASSERT_EQ(written_rows + 3 * 1024 * 1024, written_rows_counter->value()); } -} // namespace doris::pipeline +// Test multiple revoke_memory cycles with different data sizes. +TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryMultipleCycles) { + auto [_, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto sink_state = _helper.create_sink_local_state(_helper.runtime_state.get(), + sink_operator.get(), shared_state); + + auto child = std::dynamic_pointer_cast<MockChildOperator>(sink_operator->child()); + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {1}); + child->_row_descriptor = row_desc; + + const auto& tnode = sink_operator->_tnode; + auto partitioner = std::make_unique<SpillPartitionerType>(sink_operator->_partition_count); + auto status = partitioner->init({tnode.hash_join_node.eq_join_conjuncts[0].right}); + ASSERT_TRUE(status.ok()); + status = partitioner->prepare(_helper.runtime_state.get(), sink_operator->_child->row_desc()); + ASSERT_TRUE(status.ok()); + sink_state->_partitioner = std::move(partitioner); + sink_state->_shared_state->_is_spilled = false; + + // Setup spill files for all partitions + for (uint32_t i = 0; i != sink_operator->_partition_count; ++i) { + auto& spilling_file = sink_state->_shared_state->_spilled_build_groups[i]; + auto relative_path = fmt::format( + "{}/hash_build_sink_{}-{}-{}", print_id(_helper.runtime_state->query_id()), i, + sink_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, + spilling_file); + ASSERT_TRUE(st.ok()); + } + + auto& inner_sink = sink_operator->_inner_sink_operator; + auto inner_sink_local_state = std::make_unique<MockHashJoinBuildSinkLocalState>( + inner_sink.get(), sink_state->_shared_state->_inner_runtime_state.get()); + inner_sink_local_state->_hash_table_memory_usage = + sink_state->custom_profile()->add_counter("HashTableMemoryUsage", TUnit::BYTES); + inner_sink_local_state->_build_arena_memory_usage = + sink_state->operator_profile()->add_counter("BuildArenaMemoryUsage", TUnit::BYTES); + + sink_state->_finish_dependency = + Dependency::create_shared(sink_operator->operator_id(), sink_operator->node_id(), + "HashJoinBuildFinishDependency", true); + + // Round 1: small data + auto block1 = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({1, 2, 3}); + inner_sink_local_state->_build_side_mutable_block = std::move(block1); + sink_state->_shared_state->_inner_runtime_state->emplace_sink_local_state( + 0, std::move(inner_sink_local_state)); + + status = sink_state->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(status.ok()) << "Revoke memory round 1 failed: " << status.to_string(); + ASSERT_TRUE(sink_state->_shared_state->_is_spilled); + + auto* written_rows_counter = sink_state->custom_profile()->get_counter("SpillWriteRows"); + ASSERT_TRUE(written_rows_counter != nullptr); + auto round1_rows = written_rows_counter->value(); + ASSERT_GT(round1_rows, 0) << "Should have spilled some rows in round 1"; + + // Round 2: more data via partitioned blocks + auto block2 = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({10, 20, 30, 40, 50}); + sink_state->_shared_state->_partitioned_build_blocks[1] = + vectorized::MutableBlock::create_unique(std::move(block2)); + + status = sink_state->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(status.ok()) << "Revoke memory round 2 failed: " << status.to_string(); + + ASSERT_GT(written_rows_counter->value(), round1_rows) + << "Rows counter should have increased after round 2"; +} + +// Test that revocable_mem_size returns 0 immediately after revoke. +TEST_F(PartitionedHashJoinSinkOperatorTest, RevocableMemSizeAfterRevoke) { + auto [_, sink_operator] = _helper.create_operators(); + + std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state; + auto* sink_state = _helper.create_sink_local_state(_helper.runtime_state.get(), + sink_operator.get(), shared_state); + + shared_state->_is_spilled = false; + + auto status = sink_state->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(sink_state->_shared_state->_is_spilled); + + // After revoke with no data, revocable_mem_size should be 0 + ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); +} diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp index 21e57daf36e..34d02881ead 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp @@ -112,10 +112,12 @@ PartitionedHashJoinTestHelper::create_operators() { EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 2); - auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>( - obj_pool.get(), tnode, 0, desc_tbl, TEST_PARTITION_COUNT); - auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>( - obj_pool.get(), 0, 0, tnode, desc_tbl, TEST_PARTITION_COUNT); + auto probe_operator = + std::make_shared<PartitionedHashJoinProbeOperatorX>(obj_pool.get(), tnode, 0, desc_tbl); + probe_operator->_partition_count = TEST_PARTITION_COUNT; + auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(obj_pool.get(), 0, 0, + tnode, desc_tbl); + sink_operator->_partition_count = TEST_PARTITION_COUNT; auto child_operator = std::make_shared<MockChildOperator>(); auto probe_side_source_operator = std::make_shared<MockChildOperator>(); @@ -176,13 +178,12 @@ PartitionedHashJoinProbeLocalState* PartitionedHashJoinTestHelper::create_probe_ local_state->init_spill_read_counters(); local_state->init_spill_write_counters(); local_state->init_counters(); - local_state->_copy_shared_spill_profile = false; local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test"); local_state->_partitioned_blocks.resize(probe_operator->_partition_count); - local_state->_probe_spilling_files.resize(probe_operator->_partition_count); + local_state->_probe_spilling_groups.resize(probe_operator->_partition_count); - shared_state->_spilled_files.resize(probe_operator->_partition_count); + shared_state->_spilled_build_groups.resize(probe_operator->_partition_count); shared_state->_partitioned_build_blocks.resize(probe_operator->_partition_count); shared_state->_inner_runtime_state = std::make_unique<MockRuntimeState>(); @@ -211,12 +212,11 @@ PartitionedHashJoinSinkLocalState* PartitionedHashJoinTestHelper::create_sink_lo sink_operator->dests_id().front(), sink_operator->operator_id(), "PartitionedHashJoinTestDep"); - shared_state->_spilled_files.resize(sink_operator->_partition_count); + shared_state->_spilled_build_groups.resize(sink_operator->_partition_count); shared_state->_partitioned_build_blocks.resize(sink_operator->_partition_count); shared_state->_inner_runtime_state = std::make_unique<MockRuntimeState>(); shared_state->_inner_shared_state = std::make_shared<MockHashJoinSharedState>(); - shared_state->setup_shared_profile(local_state->custom_profile()); state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr)); return local_state; diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h index f0c21ce1ea2..43c8cdfc915 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h @@ -45,12 +45,12 @@ public: MockPartitionedHashJoinSharedState() { _is_spilled = false; _inner_runtime_state = nullptr; - _spilled_files.clear(); + _spilled_build_groups.clear(); _partitioned_build_blocks.clear(); } void init(size_t partition_count) { - _spilled_files.resize(partition_count); + _spilled_build_groups.resize(partition_count); _partitioned_build_blocks.resize(partition_count); } }; diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp index 4b3519dfec6..df7a4d30001 100644 --- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp +++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp @@ -232,7 +232,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill) { st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); auto input_block2 = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( @@ -302,7 +302,7 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpill2) { st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false); ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); @@ -361,9 +361,137 @@ TEST_F(SpillSortSinkOperatorTest, SinkWithSpillError) { SpillableDebugPointHelper dp_helper("fault_inject::spill_file::spill_block"); - st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); ASSERT_FALSE(st.ok()) << "spilll status should be failed"; } +// Test multiple consecutive revoke_memory calls to verify repeated spilling works. +TEST_F(SpillSortSinkOperatorTest, SinkMultipleRevokes) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = + std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state()); + ASSERT_TRUE(shared_state != nullptr); + + shared_state->create_source_dependency(sink_operator->operator_id(), sink_operator->node_id(), + "SpillSortSinkOperatorTest"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = {}}; + + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + // Perform 3 rounds of sink → revoke + for (int round = 0; round < 3; ++round) { + auto input_block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1 + round, 2 + round, 3 + round, 4 + round, 5 + round}); + input_block.insert( + vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( + {10, 9, 8, 7, 6})); + + st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false); + ASSERT_TRUE(st.ok()) << "sink failed on round " << round << ": " << st.to_string(); + + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke_memory failed on round " << round << ": " << st.to_string(); + + ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0) + << "revocable_mem_size should be 0 after revoke on round " << round; + } + + // After 3 rounds of spilling, should have 3 spill files + ASSERT_EQ(shared_state->sorted_spill_groups.size(), 3) + << "Should have 3 spill groups after 3 revokes"; + + ASSERT_TRUE(shared_state->is_spilled) << "is_spilled should be true after revoke"; +} + +// Test sinking large data (>1M rows), then verify spill counters. +TEST_F(SpillSortSinkOperatorTest, SinkLargeDataWithSpill) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = + std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state()); + ASSERT_TRUE(shared_state != nullptr); + + shared_state->create_source_dependency(sink_operator->operator_id(), sink_operator->node_id(), + "SpillSortSinkOperatorTest"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.operator_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = {}}; + + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + // Create large data + const size_t count = 100000; + std::vector<int32_t> data(count); + std::iota(data.begin(), data.end(), 0); + std::vector<int64_t> data2(count); + std::iota(data2.begin(), data2.end(), 0); + + auto input_block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data); + input_block.insert( + vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>(data2)); + + st = sink_operator->sink(_helper.runtime_state.get(), &input_block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + ASSERT_TRUE(shared_state->is_spilled); + + auto* spill_write_rows = sink_local_state->custom_profile()->get_counter("SpillWriteRows"); + ASSERT_TRUE(spill_write_rows != nullptr); + ASSERT_EQ(spill_write_rows->value(), count) + << "SpillWriteRows should match the number of rows sunk"; + + // Sink empty EOS after spill + vectorized::Block empty_block; + st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, true); + ASSERT_TRUE(st.ok()) << "sink eos failed: " << st.to_string(); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp index b00dd66a1ad..7286ff58de8 100644 --- a/be/test/pipeline/operator/spill_sort_source_operator_test.cpp +++ b/be/test/pipeline/operator/spill_sort_source_operator_test.cpp @@ -118,8 +118,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlock) { auto* local_state = _helper.runtime_state->get_local_state(source_operator->operator_id()); ASSERT_TRUE(local_state != nullptr); - shared_state->setup_shared_profile(_helper.operator_profile.get()); - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -197,8 +195,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - shared_state->setup_shared_profile(_helper.operator_profile.get()); - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -216,11 +212,10 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { // Prepare stored streams for (size_t i = 0; i != 4; ++i) { vectorized::SpillFileSPtr spill_file; - st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spill_file, - print_id(_helper.runtime_state->query_id()), sink_operator->get_name(), - sink_operator->node_id(), std::numeric_limits<int32_t>::max(), - _helper.operator_profile.get()); + auto relative_path = fmt::format("{}/{}-{}-{}", print_id(_helper.runtime_state->query_id()), + sink_operator->get_name(), sink_operator->node_id(), + ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, spill_file); ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string(); std::vector<int32_t> data; @@ -236,10 +231,16 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( data2)); - st = spill_file->spill_block(_helper.runtime_state.get(), input_block, true); - ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string(); + vectorized::SpillFileWriterSPtr writer; + st = spill_file->create_writer(_helper.runtime_state.get(), _helper.operator_profile.get(), + writer); + ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string(); + st = writer->write_block(_helper.runtime_state.get(), input_block); + ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string(); + st = writer->close(); + ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string(); - shared_state->sorted_spill_files.emplace_back(std::move(spill_file)); + shared_state->sorted_spill_groups.emplace_back(std::move(spill_file)); } std::unique_ptr<vectorized::MutableBlock> mutable_block; @@ -262,7 +263,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { } ASSERT_TRUE(eos); - ASSERT_TRUE(shared_state->sorted_spill_files.empty()) << "sorted_spill_files is not empty"; + ASSERT_TRUE(shared_state->sorted_spill_groups.empty()) << "sorted_spill_groups is not empty"; ASSERT_TRUE(mutable_block) << "mutable_block is null"; ASSERT_EQ(mutable_block->rows(), 40); auto output_block = mutable_block->to_block(); @@ -388,8 +389,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - shared_state->setup_shared_profile(_helper.operator_profile.get()); - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -407,11 +406,10 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { // Prepare stored streams for (size_t i = 0; i != 4; ++i) { vectorized::SpillFileSPtr spill_file; - st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spill_file, - print_id(_helper.runtime_state->query_id()), sink_operator->get_name(), - sink_operator->node_id(), std::numeric_limits<int32_t>::max(), - std::numeric_limits<int32_t>::max(), _helper.operator_profile.get()); + auto relative_path = fmt::format("{}/{}-{}-{}", print_id(_helper.runtime_state->query_id()), + sink_operator->get_name(), sink_operator->node_id(), + ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, spill_file); ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string(); std::vector<int32_t> data; @@ -427,10 +425,16 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( data2)); - st = spill_file->spill_block(_helper.runtime_state.get(), input_block, true); - ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string(); + vectorized::SpillFileWriterSPtr writer; + st = spill_file->create_writer(_helper.runtime_state.get(), _helper.operator_profile.get(), + writer); + ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string(); + st = writer->write_block(_helper.runtime_state.get(), input_block); + ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string(); + st = writer->close(); + ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string(); - shared_state->sorted_spill_files.emplace_back(std::move(spill_file)); + shared_state->sorted_spill_groups.emplace_back(std::move(spill_file)); } auto query_options = _helper.runtime_state->query_options(); @@ -457,7 +461,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { } } - ASSERT_TRUE(shared_state->sorted_spill_files.empty()) << "sorted_spill_files is not empty"; + ASSERT_TRUE(shared_state->sorted_spill_groups.empty()) << "sorted_spill_groups is not empty"; ASSERT_TRUE(mutable_block) << "mutable_block is null"; ASSERT_EQ(mutable_block->rows(), 40); auto output_block = mutable_block->to_block(); @@ -535,8 +539,6 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { _helper.runtime_state->get_local_state(source_operator->operator_id())); ASSERT_TRUE(local_state != nullptr); - shared_state->setup_shared_profile(_helper.operator_profile.get()); - st = local_state->open(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); @@ -554,11 +556,10 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { // Prepare stored streams for (size_t i = 0; i != 4; ++i) { vectorized::SpillFileSPtr spill_file; - st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( - _helper.runtime_state.get(), spill_file, - print_id(_helper.runtime_state->query_id()), sink_operator->get_name(), - sink_operator->node_id(), std::numeric_limits<int32_t>::max(), - std::numeric_limits<int32_t>::max(), _helper.operator_profile.get()); + auto relative_path = fmt::format("{}/{}-{}-{}", print_id(_helper.runtime_state->query_id()), + sink_operator->get_name(), sink_operator->node_id(), + ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, spill_file); ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string(); std::vector<int32_t> data; @@ -574,10 +575,16 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( data2)); - st = spill_file->spill_block(_helper.runtime_state.get(), input_block, true); - ASSERT_TRUE(st.ok()) << "spill_block failed: " << st.to_string(); + vectorized::SpillFileWriterSPtr writer; + st = spill_file->create_writer(_helper.runtime_state.get(), _helper.operator_profile.get(), + writer); + ASSERT_TRUE(st.ok()) << "create_writer failed: " << st.to_string(); + st = writer->write_block(_helper.runtime_state.get(), input_block); + ASSERT_TRUE(st.ok()) << "write_block failed: " << st.to_string(); + st = writer->close(); + ASSERT_TRUE(st.ok()) << "close writer failed: " << st.to_string(); - shared_state->sorted_spill_files.emplace_back(std::move(spill_file)); + shared_state->sorted_spill_groups.emplace_back(std::move(spill_file)); } SpillableDebugPointHelper dp_helper("fault_inject::spill_file::read_next_block"); @@ -613,4 +620,244 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); } +// Test reading from a single spill file to verify minimal sorted output. +TEST_F(SpillSortSourceOperatorTest, GetBlockWithSingleSpillFile) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + + auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = + std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state()); + ASSERT_TRUE(shared_state != nullptr); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + LocalSinkStateInfo sink_info {0, _helper.operator_profile.get(), -1, shared_state.get(), {}, + {}}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = _helper.runtime_state->get_sink_local_state(); + DCHECK(sink_local_state != nullptr); + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + LocalStateInfo info {.parent_profile = _helper.operator_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<SpillSortLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + shared_state->is_spilled = true; + + auto* sorter = shared_state->in_mem_shared_state->sorter.get(); + sorter->_sort_description.resize(sorter->_vsort_exec_exprs.ordering_expr_ctxs().size()); + for (int i = 0; i < sorter->_sort_description.size(); i++) { + sorter->_sort_description[i].column_number = i; + sorter->_sort_description[i].direction = 1; + sorter->_sort_description[i].nulls_direction = 1; + } + + // Create a single spill file with descending data + { + vectorized::SpillFileSPtr spill_file; + auto relative_path = fmt::format("{}/{}-{}-{}", print_id(_helper.runtime_state->query_id()), + sink_operator->get_name(), sink_operator->node_id(), + ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path, spill_file); + ASSERT_TRUE(st.ok()) << "create_spill_file failed: " << st.to_string(); + + auto input_block = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 4, 3, 2, 1}); + input_block.insert( + vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( + {50, 40, 30, 20, 10})); + + vectorized::SpillFileWriterSPtr writer; + st = spill_file->create_writer(_helper.runtime_state.get(), _helper.operator_profile.get(), + writer); + ASSERT_TRUE(st.ok()); + st = writer->write_block(_helper.runtime_state.get(), input_block); + ASSERT_TRUE(st.ok()); + st = writer->close(); + ASSERT_TRUE(st.ok()); + + shared_state->sorted_spill_groups.emplace_back(std::move(spill_file)); + } + + // Read all blocks from source + std::unique_ptr<vectorized::MutableBlock> mutable_block; + bool eos = false; + while (!eos) { + vectorized::Block block; + shared_state->spill_block_batch_row_count = 100; + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + if (block.empty()) { + continue; + } + if (!mutable_block) { + mutable_block = vectorized::MutableBlock::create_unique(std::move(block)); + } else { + st = mutable_block->merge(std::move(block)); + ASSERT_TRUE(st.ok()); + } + } + + ASSERT_TRUE(eos); + ASSERT_TRUE(mutable_block) << "mutable_block is null"; + ASSERT_EQ(mutable_block->rows(), 5); + + auto output_block = mutable_block->to_block(); + const auto& col1 = output_block.get_by_position(0).column; + + // Verify sorted order (ascending) + for (size_t i = 1; i < col1->size(); ++i) { + ASSERT_GE(col1->get_int(i), col1->get_int(i - 1)); + } + + st = local_state->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); +} + +// Test full pipeline: sink data → revoke → read back sorted from source. +TEST_F(SpillSortSourceOperatorTest, EndToEndSinkAndSource) { + auto [source_operator, sink_operator] = _helper.create_operators(); + + auto tnode = _helper.create_test_plan_node(); + auto shared_state = + std::dynamic_pointer_cast<SpillSortSharedState>(sink_operator->create_shared_state()); + ASSERT_TRUE(shared_state != nullptr); + + // Initialize and prepare both operators + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + shared_state->create_source_dependency(sink_operator->operator_id(), sink_operator->node_id(), + "SpillSortSinkOperatorTest"); + + // Setup sink local state + LocalSinkStateInfo sink_info {0, _helper.operator_profile.get(), -1, shared_state.get(), {}, + {}}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()); + + auto* sink_local_state = reinterpret_cast<SpillSortSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Setup source local state + LocalStateInfo source_info {.parent_profile = _helper.operator_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + st = source_operator->setup_local_state(_helper.runtime_state.get(), source_info); + ASSERT_TRUE(st.ok()); + + auto* source_local_state = reinterpret_cast<SpillSortLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(source_local_state != nullptr); + st = source_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Sink batch 1: {5,3,1,4,2} → revoke + auto block1 = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({5, 3, 1, 4, 2}); + block1.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( + {50, 30, 10, 40, 20})); + st = sink_operator->sink(_helper.runtime_state.get(), &block1, false); + ASSERT_TRUE(st.ok()); + st = sink_operator->revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + + // Sink batch 2: {10,8,6,9,7} → revoke + auto block2 = + vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>({10, 8, 6, 9, 7}); + block2.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt64>( + {100, 80, 60, 90, 70})); + st = sink_operator->sink(_helper.runtime_state.get(), &block2, false); + ASSERT_TRUE(st.ok()); + + // Sink EOS (triggers final revoke since is_spilled) + vectorized::Block empty_block; + st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, true); + ASSERT_TRUE(st.ok()); + + ASSERT_TRUE(shared_state->is_spilled); + ASSERT_GE(shared_state->sorted_spill_groups.size(), 2u); + + // Read back from source + auto* sorter = shared_state->in_mem_shared_state->sorter.get(); + sorter->_sort_description.resize(sorter->_vsort_exec_exprs.ordering_expr_ctxs().size()); + for (int i = 0; i < sorter->_sort_description.size(); i++) { + sorter->_sort_description[i].column_number = i; + sorter->_sort_description[i].direction = 1; + sorter->_sort_description[i].nulls_direction = 1; + } + + std::unique_ptr<vectorized::MutableBlock> mutable_block; + bool eos = false; + while (!eos) { + vectorized::Block block; + shared_state->spill_block_batch_row_count = 100; + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + if (block.empty()) continue; + if (!mutable_block) { + mutable_block = vectorized::MutableBlock::create_unique(std::move(block)); + } else { + st = mutable_block->merge(std::move(block)); + ASSERT_TRUE(st.ok()); + } + } + + ASSERT_TRUE(eos); + ASSERT_TRUE(mutable_block); + ASSERT_EQ(mutable_block->rows(), 10); + + auto output_block = mutable_block->to_block(); + const auto& col1 = output_block.get_by_position(0).column; + + // Verify sorted order + for (size_t i = 1; i < col1->size(); ++i) { + ASSERT_GE(col1->get_int(i), col1->get_int(i - 1)) + << "Not sorted at index " << i << ": " << col1->get_int(i - 1) << " > " + << col1->get_int(i); + } + + st = source_local_state->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp b/be/test/pipeline/operator/spill_sort_test_helper.cpp index eb0b5341429..1a6eaafbab8 100644 --- a/be/test/pipeline/operator/spill_sort_test_helper.cpp +++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp @@ -130,7 +130,7 @@ SpillSortLocalState* SpillSortTestHelper::create_source_local_state( // Build a minimal local state manually. Many tests prefer to use the // operators' own setup routines, but helper functions like this allow // individual units to be exercised without the full pipeline task. - auto local_state_uptr = std::make_unique<SpillSortLocalState>(source_operator, state); + auto local_state_uptr = std::make_unique<SpillSortLocalState>(state, source_operator); auto* local_state = local_state_uptr.get(); shared_state = std::make_shared<MockSpillSortSharedState>(); @@ -144,7 +144,6 @@ SpillSortLocalState* SpillSortTestHelper::create_source_local_state( local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); local_state->init_spill_read_counters(); local_state->init_spill_write_counters(); - local_state->_copy_shared_spill_profile = false; local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test"); state->emplace_local_state(source_operator->operator_id(), std::move(local_state_uptr)); @@ -170,8 +169,6 @@ SpillSortSinkLocalState* SpillSortTestHelper::create_sink_local_state( sink_operator->dests_id().front(), sink_operator->operator_id(), "SpillSortSinkTestDep"); - shared_state->setup_shared_profile(local_state->custom_profile()); - state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr)); return local_state; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
