This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 27d094afe97 [feat](spill) support runtime filter (#46972) 27d094afe97 is described below commit 27d094afe973c99b88bf7e2ddb20b4f005700173 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Tue Jan 14 22:09:33 2025 +0800 [feat](spill) support runtime filter (#46972) --- .../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 2 ++ be/src/pipeline/pipeline_fragment_context.cpp | 14 ++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 8d100dd4530..4da7abec23e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -176,6 +176,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( if (inner_sink_state) { build_block = inner_sink_state->_build_side_mutable_block.to_block(); block_old_mem = build_block.allocated_bytes(); + RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters( + _shared_state->inner_runtime_state.get())); } if (build_block.rows() <= 1) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 09a14c66a7f..2d96b7f7b1d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1376,20 +1376,22 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const auto enable_spill = _runtime_state->enable_spill(); if (enable_spill && !is_broadcast_join) { auto tnode_ = tnode; - /// TODO: support rf in partitioned hash join tnode_.runtime_filters.clear(); uint32_t partition_count = _runtime_state->spill_hash_join_partition_count(); auto inner_probe_operator = std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs); - auto inner_sink_operator = + // probe side inner sink operator is used to build hash table on probe side when data is spilled. + // So here use `tnode_` which has no runtime filters. + auto probe_side_inner_sink_operator = std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, tnode_, descs); RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get())); - RETURN_IF_ERROR(inner_sink_operator->init(tnode_, _runtime_state.get())); + RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get())); auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>( pool, tnode_, next_operator_id(), descs, partition_count); - probe_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); + probe_operator->set_inner_operators(probe_side_inner_sink_operator, + inner_probe_operator); op = std::move(probe_operator); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); @@ -1401,8 +1403,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); + auto inner_sink_operator = + std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, tnode, descs); auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>( pool, next_sink_operator_id(), tnode_, descs, partition_count); + + RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get())); sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); DataSinkOperatorPtr sink = std::move(sink_operator); sink->set_dests_id({op->operator_id()}); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org