This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 17bc0cc7ee6 [fix](spill) inner local state should be initialized in init phase (#47211) 17bc0cc7ee6 is described below commit 17bc0cc7ee6e9a436c87ce164f0989d8759d1dd6 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Fri Jan 24 05:17:30 2025 +0800 [fix](spill) inner local state should be initialized in init phase (#47211) --- .../exec/partitioned_hash_join_sink_operator.cpp | 26 +++++++++++----------- .../exec/partitioned_hash_join_sink_operator.h | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 635b642cbf3..2df474f42eb 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -61,6 +61,7 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", TUnit::UNIT, 1); _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); + RETURN_IF_ERROR(_setup_internal_operator(state)); return Status::OK(); } @@ -70,7 +71,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { _shared_state->setup_shared_profile(_profile); RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state)); auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); - RETURN_IF_ERROR(p._setup_internal_operator(state)); + _shared_state->inner_runtime_state->set_task(state->get_task()); for (uint32_t i = 0; i != p._partition_count; ++i) { auto& spilling_stream = _shared_state->spilled_streams[i]; RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( @@ -513,9 +514,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { return _inner_sink_operator->open(state); } -Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* state) { - auto& local_state = get_local_state(state); - +Status PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* state) { auto inner_runtime_state = RuntimeState::create_unique( state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); @@ -527,29 +526,30 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* inner_runtime_state->resize_op_id_to_local_state(-1); inner_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr()); + auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); auto inner_shared_state = std::dynamic_pointer_cast<HashJoinSharedState>( - _inner_sink_operator->create_shared_state()); - LocalSinkStateInfo info { - 0, local_state._internal_runtime_profile.get(), -1, inner_shared_state.get(), {}, {}}; + p._inner_sink_operator->create_shared_state()); + LocalSinkStateInfo info {0, _internal_runtime_profile.get(), -1, inner_shared_state.get(), {}, + {}}; - RETURN_IF_ERROR(_inner_sink_operator->setup_local_state(inner_runtime_state.get(), info)); + RETURN_IF_ERROR(p._inner_sink_operator->setup_local_state(inner_runtime_state.get(), info)); auto* sink_local_state = inner_runtime_state->get_sink_local_state(); DCHECK(sink_local_state != nullptr); LocalStateInfo state_info { - local_state._internal_runtime_profile.get(), {}, inner_shared_state.get(), {}, 0}; + _internal_runtime_profile.get(), {}, inner_shared_state.get(), {}, 0}; RETURN_IF_ERROR( - _inner_probe_operator->setup_local_state(inner_runtime_state.get(), state_info)); + p._inner_probe_operator->setup_local_state(inner_runtime_state.get(), state_info)); auto* probe_local_state = - inner_runtime_state->get_local_state(_inner_probe_operator->operator_id()); + inner_runtime_state->get_local_state(p._inner_probe_operator->operator_id()); DCHECK(probe_local_state != nullptr); RETURN_IF_ERROR(probe_local_state->open(state)); RETURN_IF_ERROR(sink_local_state->open(state)); /// Set these two values after all the work is ready. - local_state._shared_state->inner_shared_state = std::move(inner_shared_state); - local_state._shared_state->inner_runtime_state = std::move(inner_runtime_state); + _shared_state->inner_shared_state = std::move(inner_shared_state); + _shared_state->inner_runtime_state = std::move(inner_runtime_state); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 73955932427..492c98fa637 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -69,6 +69,8 @@ protected: Status _finish_spilling(); + Status _setup_internal_operator(RuntimeState* state); + friend class PartitionedHashJoinSinkOperatorX; bool _child_eos {false}; @@ -140,8 +142,6 @@ public: private: friend class PartitionedHashJoinSinkLocalState; - Status _setup_internal_operator(RuntimeState* state); - const TJoinDistributionType::type _join_distribution; std::vector<TExpr> _build_exprs; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org