This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 27d094afe97 [feat](spill) support runtime filter (#46972)
27d094afe97 is described below

commit 27d094afe973c99b88bf7e2ddb20b4f005700173
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Tue Jan 14 22:09:33 2025 +0800

    [feat](spill) support runtime filter (#46972)
---
 .../pipeline/exec/partitioned_hash_join_sink_operator.cpp  |  2 ++
 be/src/pipeline/pipeline_fragment_context.cpp              | 14 ++++++++++----
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8d100dd4530..4da7abec23e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -176,6 +176,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
     if (inner_sink_state) {
         build_block = inner_sink_state->_build_side_mutable_block.to_block();
         block_old_mem = build_block.allocated_bytes();
+        RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters(
+                _shared_state->inner_runtime_state.get()));
     }
 
     if (build_block.rows() <= 1) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 09a14c66a7f..2d96b7f7b1d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1376,20 +1376,22 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         const auto enable_spill = _runtime_state->enable_spill();
         if (enable_spill && !is_broadcast_join) {
             auto tnode_ = tnode;
-            /// TODO: support rf in partitioned hash join
             tnode_.runtime_filters.clear();
             uint32_t partition_count = 
_runtime_state->spill_hash_join_partition_count();
             auto inner_probe_operator =
                     std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, 
descs);
-            auto inner_sink_operator =
+            // probe side inner sink operator is used to build hash table on 
probe side when data is spilled.
+            // So here use `tnode_` which has no runtime filters.
+            auto probe_side_inner_sink_operator =
                     std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 
tnode_, descs);
 
             RETURN_IF_ERROR(inner_probe_operator->init(tnode_, 
_runtime_state.get()));
-            RETURN_IF_ERROR(inner_sink_operator->init(tnode_, 
_runtime_state.get()));
+            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, 
_runtime_state.get()));
 
             auto probe_operator = 
std::make_shared<PartitionedHashJoinProbeOperatorX>(
                     pool, tnode_, next_operator_id(), descs, partition_count);
-            probe_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
+            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
+                                                inner_probe_operator);
             op = std::move(probe_operator);
             RETURN_IF_ERROR(cur_pipe->add_operator(
                     op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
@@ -1401,8 +1403,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
+            auto inner_sink_operator =
+                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 
tnode, descs);
             auto sink_operator = 
std::make_shared<PartitionedHashJoinSinkOperatorX>(
                     pool, next_sink_operator_id(), tnode_, descs, 
partition_count);
+
+            RETURN_IF_ERROR(inner_sink_operator->init(tnode, 
_runtime_state.get()));
             sink_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
             DataSinkOperatorPtr sink = std::move(sink_operator);
             sink->set_dests_id({op->operator_id()});


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to