This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 17bc0cc7ee6 [fix](spill) inner local state should be initialized in 
init phase (#47211)
17bc0cc7ee6 is described below

commit 17bc0cc7ee6e9a436c87ce164f0989d8759d1dd6
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Fri Jan 24 05:17:30 2025 +0800

    [fix](spill) inner local state should be initialized in init phase (#47211)
---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 26 +++++++++++-----------
 .../exec/partitioned_hash_join_sink_operator.h     |  4 ++--
 2 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 635b642cbf3..2df474f42eb 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -61,6 +61,7 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", 
TUnit::UNIT, 1);
     _memory_usage_reserved =
             ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
+    RETURN_IF_ERROR(_setup_internal_operator(state));
     return Status::OK();
 }
 
@@ -70,7 +71,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* 
state) {
     _shared_state->setup_shared_profile(_profile);
     RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
-    RETURN_IF_ERROR(p._setup_internal_operator(state));
+    _shared_state->inner_runtime_state->set_task(state->get_task());
     for (uint32_t i = 0; i != p._partition_count; ++i) {
         auto& spilling_stream = _shared_state->spilled_streams[i];
         
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -513,9 +514,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* 
state) {
     return _inner_sink_operator->open(state);
 }
 
-Status 
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* state) 
{
-    auto& local_state = get_local_state(state);
-
+Status 
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* 
state) {
     auto inner_runtime_state = RuntimeState::create_unique(
             state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
@@ -527,29 +526,30 @@ Status 
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
     inner_runtime_state->resize_op_id_to_local_state(-1);
     
inner_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());
 
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     auto inner_shared_state = std::dynamic_pointer_cast<HashJoinSharedState>(
-            _inner_sink_operator->create_shared_state());
-    LocalSinkStateInfo info {
-            0, local_state._internal_runtime_profile.get(), -1, 
inner_shared_state.get(), {}, {}};
+            p._inner_sink_operator->create_shared_state());
+    LocalSinkStateInfo info {0, _internal_runtime_profile.get(), -1, 
inner_shared_state.get(), {},
+                             {}};
 
-    
RETURN_IF_ERROR(_inner_sink_operator->setup_local_state(inner_runtime_state.get(),
 info));
+    
RETURN_IF_ERROR(p._inner_sink_operator->setup_local_state(inner_runtime_state.get(),
 info));
     auto* sink_local_state = inner_runtime_state->get_sink_local_state();
     DCHECK(sink_local_state != nullptr);
 
     LocalStateInfo state_info {
-            local_state._internal_runtime_profile.get(), {}, 
inner_shared_state.get(), {}, 0};
+            _internal_runtime_profile.get(), {}, inner_shared_state.get(), {}, 
0};
 
     RETURN_IF_ERROR(
-            
_inner_probe_operator->setup_local_state(inner_runtime_state.get(), 
state_info));
+            
p._inner_probe_operator->setup_local_state(inner_runtime_state.get(), 
state_info));
     auto* probe_local_state =
-            
inner_runtime_state->get_local_state(_inner_probe_operator->operator_id());
+            
inner_runtime_state->get_local_state(p._inner_probe_operator->operator_id());
     DCHECK(probe_local_state != nullptr);
     RETURN_IF_ERROR(probe_local_state->open(state));
     RETURN_IF_ERROR(sink_local_state->open(state));
 
     /// Set these two values after all the work is ready.
-    local_state._shared_state->inner_shared_state = 
std::move(inner_shared_state);
-    local_state._shared_state->inner_runtime_state = 
std::move(inner_runtime_state);
+    _shared_state->inner_shared_state = std::move(inner_shared_state);
+    _shared_state->inner_runtime_state = std::move(inner_runtime_state);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 73955932427..492c98fa637 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -69,6 +69,8 @@ protected:
 
     Status _finish_spilling();
 
+    Status _setup_internal_operator(RuntimeState* state);
+
     friend class PartitionedHashJoinSinkOperatorX;
 
     bool _child_eos {false};
@@ -140,8 +142,6 @@ public:
 private:
     friend class PartitionedHashJoinSinkLocalState;
 
-    Status _setup_internal_operator(RuntimeState* state);
-
     const TJoinDistributionType::type _join_distribution;
 
     std::vector<TExpr> _build_exprs;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to