This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bb720e4307  (feature)[pipelineX]Make operator_id negative in 
pipelineX (#29649)
5bb720e4307 is described below

commit 5bb720e43073b6c11accb957ab9540bdadc8656c
Author: Mryange <[email protected]>
AuthorDate: Tue Jan 9 09:27:19 2024 +0800

     (feature)[pipelineX]Make operator_id negative in pipelineX (#29649)
    
    "operator_id" should be invisible, but the local shuffle is a planned 
operator in the BE (Backend), without a plan node ID. We use it in profiles and 
other places, and there might be duplicates. Therefore, we switch it to a 
negative number here to distinguish it as a plan node ID.
---
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 2 +-
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h   | 4 ++--
 be/src/runtime/runtime_state.cpp                           | 7 +++++--
 be/src/runtime/runtime_state.h                             | 2 +-
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a8a576889d2..6b1d9af512b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -502,7 +502,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             runtime_state->set_desc_tbl(_desc_tbl);
             
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
             runtime_state->set_num_per_fragment_instances(request.num_senders);
-            runtime_state->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
+            runtime_state->resize_op_id_to_local_state(max_operator_id());
             
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
             runtime_state->set_total_load_streams(request.total_load_streams);
             runtime_state->set_num_local_sink(request.num_local_sink);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index c8f9ef1edc6..34320d64f38 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -109,11 +109,11 @@ public:
         return _runtime_filter_mgr_map[fragment_instance_id].get();
     }
 
-    [[nodiscard]] int next_operator_id() { return _operator_id++; }
+    [[nodiscard]] int next_operator_id() { return _operator_id--; }
 
     [[nodiscard]] int max_operator_id() const { return _operator_id; }
 
-    [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; }
+    [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
 
     [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; 
}
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index eec4cddb321..1a6f8a2661f 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -469,22 +469,25 @@ int64_t RuntimeState::get_load_mem_limit() {
     }
 }
 
-void RuntimeState::resize_op_id_to_local_state(int operator_size, int 
sink_size) {
-    _op_id_to_local_state.resize(operator_size);
+void RuntimeState::resize_op_id_to_local_state(int operator_size) {
+    _op_id_to_local_state.resize(-operator_size);
 }
 
 void RuntimeState::emplace_local_state(
         int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> 
state) {
+    id = -id;
     DCHECK(id < _op_id_to_local_state.size());
     DCHECK(!_op_id_to_local_state[id]);
     _op_id_to_local_state[id] = std::move(state);
 }
 
 doris::pipeline::PipelineXLocalStateBase* RuntimeState::get_local_state(int 
id) {
+    id = -id;
     return _op_id_to_local_state[id].get();
 }
 
 Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id) 
{
+    id = -id;
     if (id >= _op_id_to_local_state.size()) {
         return ResultError(Status::InternalError("get_local_state out of range 
size:{} , id:{}",
                                                  _op_id_to_local_state.size(), 
id));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1470ec89776..9777f594ff6 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -553,7 +553,7 @@ public:
 
     Result<SinkLocalState*> get_sink_local_state_result(int id);
 
-    void resize_op_id_to_local_state(int operator_size, int sink_size);
+    void resize_op_id_to_local_state(int operator_size);
 
     auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to