This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5bb720e4307 (feature)[pipelineX]Make operator_id negative in
pipelineX (#29649)
5bb720e4307 is described below
commit 5bb720e43073b6c11accb957ab9540bdadc8656c
Author: Mryange <[email protected]>
AuthorDate: Tue Jan 9 09:27:19 2024 +0800
(feature)[pipelineX]Make operator_id negative in pipelineX (#29649)
"operator_id" should be invisible, but the local shuffle is a planned
operator in the BE (Backend), without a plan node ID. We use it in profiles and
other places, and there might be duplicates. Therefore, we switch it to a
negative number here to distinguish it as a plan node ID.
---
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 4 ++--
be/src/runtime/runtime_state.cpp | 7 +++++--
be/src/runtime/runtime_state.h | 2 +-
4 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a8a576889d2..6b1d9af512b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -502,7 +502,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
runtime_state->set_desc_tbl(_desc_tbl);
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
runtime_state->set_num_per_fragment_instances(request.num_senders);
- runtime_state->resize_op_id_to_local_state(max_operator_id(),
max_sink_operator_id());
+ runtime_state->resize_op_id_to_local_state(max_operator_id());
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
runtime_state->set_total_load_streams(request.total_load_streams);
runtime_state->set_num_local_sink(request.num_local_sink);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index c8f9ef1edc6..34320d64f38 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -109,11 +109,11 @@ public:
return _runtime_filter_mgr_map[fragment_instance_id].get();
}
- [[nodiscard]] int next_operator_id() { return _operator_id++; }
+ [[nodiscard]] int next_operator_id() { return _operator_id--; }
[[nodiscard]] int max_operator_id() const { return _operator_id; }
- [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; }
+ [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
[[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id;
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index eec4cddb321..1a6f8a2661f 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -469,22 +469,25 @@ int64_t RuntimeState::get_load_mem_limit() {
}
}
-void RuntimeState::resize_op_id_to_local_state(int operator_size, int
sink_size) {
- _op_id_to_local_state.resize(operator_size);
+void RuntimeState::resize_op_id_to_local_state(int operator_size) {
+ _op_id_to_local_state.resize(-operator_size);
}
void RuntimeState::emplace_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>
state) {
+ id = -id;
DCHECK(id < _op_id_to_local_state.size());
DCHECK(!_op_id_to_local_state[id]);
_op_id_to_local_state[id] = std::move(state);
}
doris::pipeline::PipelineXLocalStateBase* RuntimeState::get_local_state(int
id) {
+ id = -id;
return _op_id_to_local_state[id].get();
}
Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id)
{
+ id = -id;
if (id >= _op_id_to_local_state.size()) {
return ResultError(Status::InternalError("get_local_state out of range
size:{} , id:{}",
_op_id_to_local_state.size(),
id));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1470ec89776..9777f594ff6 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -553,7 +553,7 @@ public:
Result<SinkLocalState*> get_sink_local_state_result(int id);
- void resize_op_id_to_local_state(int operator_size, int sink_size);
+ void resize_op_id_to_local_state(int operator_size);
auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]