This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb0cedeee8b [pipelineX](refactor) Rebuild relationship between dep and
operator (#31487)
bb0cedeee8b is described below
commit bb0cedeee8b7902b04ec8fb05ace0f3bba480438
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 29 09:58:27 2024 +0800
[pipelineX](refactor) Rebuild relationship between dep and operator (#31487)
---
be/src/pipeline/exec/analytic_source_operator.h | 1 -
be/src/pipeline/exec/es_scan_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 7 --
be/src/pipeline/exec/exchange_sink_operator.h | 18 ++-
be/src/pipeline/exec/exchange_source_operator.cpp | 5 +-
be/src/pipeline/exec/exchange_source_operator.h | 11 +-
be/src/pipeline/exec/file_scan_operator.cpp | 2 +-
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 11 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 4 +-
be/src/pipeline/exec/scan_operator.cpp | 28 +++--
be/src/pipeline/exec/scan_operator.h | 9 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +
be/src/pipeline/exec/set_sink_operator.cpp | 2 -
be/src/pipeline/exec/set_sink_operator.h | 5 +-
be/src/pipeline/exec/set_source_operator.cpp | 7 +-
be/src/pipeline/exec/set_source_operator.h | 6 +-
be/src/pipeline/exec/union_sink_operator.h | 13 ++
be/src/pipeline/exec/union_source_operator.cpp | 32 +++--
be/src/pipeline/exec/union_source_operator.h | 4 +
be/src/pipeline/pipeline_x/dependency.cpp | 29 +++--
be/src/pipeline/pipeline_x/dependency.h | 87 ++++++--------
.../local_exchange_source_operator.cpp | 1 -
be/src/pipeline/pipeline_x/operator.cpp | 131 ++++++---------------
be/src/pipeline/pipeline_x/operator.h | 35 +++---
.../pipeline_x/pipeline_x_fragment_context.cpp | 15 ++-
.../pipeline_x/pipeline_x_fragment_context.h | 2 -
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 44 +++----
be/src/pipeline/pipeline_x/pipeline_x_task.h | 70 +++++------
28 files changed, 266 insertions(+), 317 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index e98a50186e9..b2ab5b24b3c 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -76,7 +76,6 @@ private:
if (need_more_input) {
_dependency->block();
_dependency->set_ready_to_write();
- _shared_state->sink_dep->set_ready();
} else {
_dependency->set_block_to_write();
_dependency->set_ready();
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index 96283ac5052..c00ee6917ea 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -67,7 +67,7 @@ Status EsScanLocalState::_process_conjuncts() {
Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index a43cc07b92a..1220230a343 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -170,12 +170,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
id, p._dest_node_id, _sender_id, _state->be_number(), state);
register_channels(_sink_buffer.get());
- auto* _exchange_sink_dependency = _dependency;
_queue_dependency =
Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"ExchangeSinkQueueDependency", true,
state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
- _exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() ==
1) &&
!only_local_exchange) {
_broadcast_dependency =
@@ -186,7 +184,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
- _exchange_sink_dependency->add_child(_broadcast_dependency);
_wait_broadcast_buffer_timer =
ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer",
timer_name);
@@ -194,19 +191,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
size_t dep_id = 0;
_local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
- auto deps_for_channels = AndDependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
-
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), timer_name);
dep_id++;
}
}
- _exchange_sink_dependency->add_child(deps_for_channels);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 43919236a86..4a2f1a3dfd9 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -64,9 +64,9 @@ private:
int _mult_cast_id = -1;
};
-class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<AndSharedState> {
+class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
- using Base = PipelineXSinkLocalState<AndSharedState>;
+ using Base = PipelineXSinkLocalState<>;
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
@@ -79,6 +79,16 @@ public:
state->get_query_ctx());
}
+ std::vector<Dependency*> dependencies() const override {
+ std::vector<Dependency*> dep_vec;
+ dep_vec.push_back(_queue_dependency.get());
+ if (_broadcast_dependency) {
+ dep_vec.push_back(_broadcast_dependency.get());
+ }
+ std::for_each(_local_channels_dependency.begin(),
_local_channels_dependency.end(),
+ [&](std::shared_ptr<Dependency> dep) {
dep_vec.push_back(dep.get()); });
+ return dep_vec;
+ }
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
@@ -154,8 +164,8 @@ private:
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
- std::shared_ptr<Dependency> _queue_dependency;
- std::shared_ptr<Dependency> _broadcast_dependency;
+ std::shared_ptr<Dependency> _queue_dependency = nullptr;
+ std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
/**
* We use this to control the execution for local exchange.
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 2c519319dd0..664e576e1ce 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
profile(), p.is_merging());
- auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
@@ -82,10 +81,8 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
deps[i] = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"SHUFFLE_DATA_DEPENDENCY",
state->get_query_ctx());
queues[i]->set_dependency(deps[i]);
- source_dependency->add_child(deps[i]);
}
- static const std::string timer_name =
- "WaitForDependency[" + source_dependency->name() + "]Time";
+ static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
metrics[i] = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile,
fmt::format("WaitForData{}", i),
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 1b106f2e259..6176ad5b7f7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -51,17 +51,24 @@ public:
};
class ExchangeSourceOperatorX;
-class ExchangeLocalState final : public PipelineXLocalState<AndSharedState> {
+class ExchangeLocalState final : public PipelineXLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
public:
- using Base = PipelineXLocalState<AndSharedState>;
+ using Base = PipelineXLocalState<>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;
+
+ std::vector<Dependency*> dependencies() const override {
+ std::vector<Dependency*> dep_vec;
+ std::for_each(deps.begin(), deps.end(),
+ [&](std::shared_ptr<Dependency> dep) {
dep_vec.push_back(dep.get()); });
+ return dep_vec;
+ }
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index ce0c5042cff..ac193147dfb 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -33,7 +33,7 @@ namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index d9a29dfa0d4..b4886f089ef 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -92,11 +92,16 @@ public:
const RowDescriptor& row_desc() const override { return _row_desc; }
- std::shared_ptr<MultiCastSharedState> create_multi_cast_data_streamer() {
- auto multi_cast_data_streamer =
+ std::shared_ptr<BasicSharedState> create_shared_state() const override {
+ std::shared_ptr<BasicSharedState> ss =
std::make_shared<MultiCastSharedState>(_row_desc, _pool,
_cast_sender_count);
- return multi_cast_data_streamer;
+ ss->id = operator_id();
+ for (auto& dest : dests_id()) {
+ ss->related_op_ids.insert(dest);
+ }
+ return ss;
}
+
const TMultiCastDataStreamSink& sink_node() { return _sink; }
private:
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index f443cacf040..0aab714449e 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -225,7 +225,7 @@ bool OlapScanLocalState::_storage_no_merge() {
Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
@@ -486,7 +486,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
}
if (eos) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
}
for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index e3b0fe38a1d..0fb92d793a8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -111,7 +111,12 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo&
info) {
- RETURN_IF_ERROR(PipelineXLocalState<EmptySharedState>::init(state, info));
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ _scan_dependency =
+ Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ _parent->get_name() + "_DEPENDENCY",
state->get_query_ctx());
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" + _scan_dependency->name()
+ "]Time", 1);
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<typename Derived::Parent>();
@@ -252,7 +257,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
[&](auto&& range) {
if (range.is_empty_value_range()) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
}
},
it.second.second);
@@ -543,8 +548,7 @@ template <typename Derived>
std::string ScanLocalState<Derived>::debug_string(int indentation_level) const
{
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _eos = {}",
-
PipelineXLocalState<EmptySharedState>::debug_string(indentation_level),
- _eos.load());
+ PipelineXLocalState<>::debug_string(indentation_level),
_eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
fmt::format_to(debug_string_buffer, ", Scanner Context: {}",
_scanner_ctx->debug_string());
@@ -587,7 +591,7 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
}
} else if (const vectorized::ColumnVector<vectorized::UInt8>*
bool_column =
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -605,7 +609,7 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
}
} else {
LOG(WARNING) << "Constant predicate in scan node should return
a bool column with "
@@ -803,7 +807,7 @@ Status
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
auto fn_name = std::string("");
if (!is_fixed_range && state->null_in_set) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
}
while (iter->has_next()) {
// column not in (nullptr) is always true
@@ -1201,7 +1205,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
}
if (scanners.empty()) {
_eos = true;
- _dependency->set_ready();
+ _scan_dependency->set_ready();
} else {
for (auto& scanner : scanners) {
scanner->set_query_statistics(_query_statistics.get());
@@ -1218,7 +1222,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- state()->scan_queue_mem_limit(), _dependency->shared_from_this());
+ state()->scan_queue_mem_limit(), _scan_dependency);
return Status::OK();
}
@@ -1404,7 +1408,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
if (_closed) {
return Status::OK();
}
- COUNTER_UPDATE(exec_time_counter(), _dependency->watcher_elapse_time());
+ COUNTER_UPDATE(exec_time_counter(),
_scan_dependency->watcher_elapse_time());
COUNTER_UPDATE(exec_time_counter(),
_filter_dependency->watcher_elapse_time());
SCOPED_TIMER(_close_timer);
@@ -1412,10 +1416,10 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
if (_scanner_ctx) {
_scanner_ctx->stop_scanners(state);
}
- COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
- return PipelineXLocalState<EmptySharedState>::close(state);
+ return PipelineXLocalState<>::close(state);
}
template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 0b1b2f46da7..958581b2f6f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -57,11 +57,10 @@ public:
std::string debug_string() const override;
};
-class ScanLocalStateBase : public PipelineXLocalState<EmptySharedState>,
- public vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<>, public
vectorized::RuntimeFilterConsumer {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
- : PipelineXLocalState<EmptySharedState>(state, parent),
+ : PipelineXLocalState<>(state, parent),
vectorized::RuntimeFilterConsumer(parent->node_id(),
parent->runtime_filter_descs(),
parent->row_descriptor(),
_conjuncts) {}
virtual ~ScanLocalStateBase() = default;
@@ -97,6 +96,8 @@ protected:
std::atomic<bool> _opened {false};
+ DependencySPtr _scan_dependency = nullptr;
+
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
@@ -166,6 +167,8 @@ class ScanLocalState : public ScanLocalStateBase {
RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
+ std::vector<Dependency*> dependencies() const override { return
{_scan_dependency.get()}; }
+
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 2e8500cd452..6b4197ea94b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -131,6 +131,8 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
+
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>&
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 9080bb24504..7ef4871555d 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -198,10 +198,8 @@ Status SetSinkOperatorX<is_intersect>::init(const
TPlanNode& tnode, RuntimeState
// Create result_expr_ctx_lists_ from thrift exprs.
if (tnode.node_type == TPlanNodeType::type::INTERSECT_NODE) {
result_texpr_lists = &(tnode.intersect_node.result_expr_lists);
- _child_quantity = tnode.intersect_node.result_expr_lists.size();
} else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) {
result_texpr_lists = &(tnode.except_node.result_expr_lists);
- _child_quantity = tnode.except_node.result_expr_lists.size();
} else {
return Status::NotSupported("Not Implemented, Check The Operation
Node.");
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 63b2b89380b..24f23593ea0 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -99,6 +99,9 @@ public:
const DescriptorTbl& descs)
: Base(sink_id, tnode.node_id, tnode.node_id),
_cur_child_id(child_id),
+ _child_quantity(tnode.node_type ==
TPlanNodeType::type::INTERSECT_NODE
+ ?
tnode.intersect_node.result_expr_lists.size()
+ :
tnode.except_node.result_expr_lists.size()),
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ?
tnode.intersect_node.result_expr_lists[child_id]
@@ -131,7 +134,7 @@ private:
vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs);
const int _cur_child_id;
- int _child_quantity;
+ const int _child_quantity;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 954ca28dc9b..15524a25a7b 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -54,11 +54,8 @@ Status SetSourceLocalState<is_intersect>::init(RuntimeState*
state, LocalStateIn
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- auto& deps = info.upstream_dependencies;
- _shared_state->probe_finished_children_dependency.resize(deps.size(),
nullptr);
- for (auto& dep : deps) {
- dep->set_shared_state(_dependency->shared_state());
- }
+ _shared_state->probe_finished_children_dependency.resize(
+ _parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity,
nullptr);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index d7026f015cf..1c5cf162940 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -85,7 +85,10 @@ public:
SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs)
- : Base(pool, tnode, operator_id, descs) {};
+ : Base(pool, tnode, operator_id, descs),
+ _child_quantity(tnode.node_type ==
TPlanNodeType::type::INTERSECT_NODE
+ ?
tnode.intersect_node.result_expr_lists.size()
+ :
tnode.except_node.result_expr_lists.size()) {};
~SetSourceOperatorX() override = default;
[[nodiscard]] bool is_source() const override { return true; }
@@ -105,6 +108,7 @@ private:
void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
vectorized::RowRefListWithFlags& value, int&
block_size);
+ const int _child_quantity;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index 5fd670f0ef7..6d79d3f2a9f 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -111,6 +111,19 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
+ std::shared_ptr<BasicSharedState> create_shared_state() const override {
+ if (_cur_child_id > 0) {
+ return nullptr;
+ } else {
+ std::shared_ptr<BasicSharedState> ss =
std::make_shared<UnionSharedState>(_child_size);
+ ss->id = operator_id();
+ for (auto& dest : dests_id()) {
+ ss->related_op_ids.insert(dest);
+ }
+ return ss;
+ }
+ }
+
private:
int _get_first_materialized_child_idx() const { return
_first_materialized_child_idx; }
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index f2f4ca82e4c..dc1de2900b9 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -111,15 +111,18 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
- int child_count = p.get_child_count();
- if (child_count != 0) {
- auto& deps = info.upstream_dependencies;
- for (auto& dep : deps) {
- dep->set_shared_state(_dependency->shared_state());
- }
+ if (p.get_child_count() != 0) {
+ ((UnionSharedState*)_dependency->shared_state())
+
->data_queue.set_source_dependency(_shared_state->source_deps.front());
+ } else {
+ _only_const_dependency = Dependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY",
+ state->get_query_ctx());
+ _dependency = _only_const_dependency.get();
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
}
- ((UnionSharedState*)_dependency->shared_state())
- ->data_queue.set_source_dependency(info.dependency);
+
// Const exprs materialized by this node. These exprs don't refer to any
children.
// Only materialized by the first fragment instance to avoid duplication.
if (state->per_fragment_instance_idx() == 0) {
@@ -138,7 +141,8 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(clone_expr_list(_const_expr_list,
other_expr_list));
}
}
- if (child_count == 0) {
+
+ if (p.get_child_count() == 0) {
_dependency->set_ready();
}
return Status::OK();
@@ -147,9 +151,11 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
std::string UnionSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {},
has_data = {})",
- _shared_state->data_queue.is_all_finish(),
- _shared_state->data_queue.remaining_has_data());
+ if (_shared_state) {
+ fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish =
{}, has_data = {})",
+ _shared_state->data_queue.is_all_finish(),
+ _shared_state->data_queue.remaining_has_data());
+ }
return fmt::to_string(debug_string_buffer);
}
@@ -161,7 +167,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
RETURN_IF_ERROR(get_next_const(state, block));
}
local_state._need_read_for_const_expr = has_more_const(state);
- } else {
+ } else if (_child_size != 0) {
std::unique_ptr<vectorized::Block> output_block =
vectorized::Block::create_unique();
int child_idx = 0;
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 69e81bcd4fc..40d02324cbd 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -87,6 +87,10 @@ private:
bool _need_read_for_const_expr {true};
int _const_expr_list_idx {0};
std::vector<vectorized::VExprContextSPtrs> _const_expr_lists;
+
+ // If this operator has no children, there is no shared state which owns
dependency. So we
+ // use this local state to hold this dependency.
+ DependencySPtr _only_const_dependency = nullptr;
};
class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 56045118a94..adbadcfb835 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -29,6 +29,22 @@
namespace doris::pipeline {
+Dependency* BasicSharedState::create_source_dependency(int operator_id, int
node_id,
+ std::string name,
QueryContext* ctx) {
+ source_deps.push_back(
+ std::make_shared<Dependency>(operator_id, node_id, name +
"_DEPENDENCY", ctx));
+ source_deps.back()->set_shared_state(this);
+ return source_deps.back().get();
+}
+
+Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
std::string name,
+ QueryContext* ctx) {
+ sink_deps.push_back(
+ std::make_shared<Dependency>(dest_id, node_id, name +
"_DEPENDENCY", true, ctx));
+ sink_deps.back()->set_shared_state(this);
+ return sink_deps.back().get();
+}
+
void Dependency::_add_block_task(PipelineXTask* task) {
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] !=
task)
<< "Duplicate task: " << task->debug_string();
@@ -103,17 +119,6 @@ std::string RuntimeFilterDependency::debug_string(int
indentation_level) {
return fmt::to_string(debug_string_buffer);
}
-std::string AndDependency::debug_string(int indentation_level) {
- fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
- std::string(indentation_level * 2, ' '), _name, _node_id);
- for (auto& child : _children) {
- fmt::format_to(debug_string_buffer, "{}, \n",
child->debug_string(indentation_level = 1));
- }
- fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level *
2, ' '));
- return fmt::to_string(debug_string_buffer);
-}
-
bool RuntimeFilterTimer::has_ready() {
std::unique_lock<std::mutex> lc(_lock);
return _is_ready;
@@ -193,7 +198,7 @@ void LocalExchangeSharedState::sub_running_sink_operators()
{
}
LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
- source_dependencies.resize(num_instances, nullptr);
+ source_deps.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
}
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 3de90fa915a..1733a8fd291 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -70,9 +70,18 @@ struct BasicSharedState {
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET*>(this);
}
- DependencySPtr source_dep = nullptr;
- DependencySPtr sink_dep = nullptr;
+ std::vector<DependencySPtr> source_deps;
+ std::vector<DependencySPtr> sink_deps;
+ int id = 0;
+ std::set<int> related_op_ids;
+
virtual ~BasicSharedState() = default;
+
+ Dependency* create_source_dependency(int operator_id, int node_id,
std::string name,
+ QueryContext* ctx);
+
+ Dependency* create_sink_dependency(int dest_id, int node_id, std::string
name,
+ QueryContext* ctx);
};
class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -94,22 +103,15 @@ public:
_query_ctx(query_ctx) {}
virtual ~Dependency() = default;
+ bool is_write_dependency() const { return _is_write_dependency; }
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
- virtual void add_child(std::shared_ptr<Dependency> child) {
- LOG(FATAL) << "Only AndDependency could add child, it is wrong usage";
- }
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state =
shared_state; }
virtual std::string debug_string(int indentation_level = 0);
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
- void start_watcher() {
- for (auto& child : _children) {
- child->start_watcher();
- }
- _watcher.start();
- }
+ void start_watcher() { _watcher.start(); }
[[nodiscard]] int64_t watcher_elapse_time() { return
_watcher.elapsed_time(); }
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
@@ -118,21 +120,21 @@ public:
void set_ready();
void set_ready_to_read() {
DCHECK(_is_write_dependency) << debug_string();
- DCHECK(_shared_state->source_dep != nullptr) << debug_string();
- _shared_state->source_dep->set_ready();
+ DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ _shared_state->source_deps.front()->set_ready();
}
void set_block_to_read() {
DCHECK(_is_write_dependency) << debug_string();
- DCHECK(_shared_state->source_dep != nullptr) << debug_string();
- _shared_state->source_dep->block();
+ DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ _shared_state->source_deps.front()->block();
}
void set_ready_to_write() {
- DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
- _shared_state->sink_dep->set_ready();
+ DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ _shared_state->sink_deps.front()->set_ready();
}
void set_block_to_write() {
- DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
- _shared_state->sink_dep->block();
+ DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ _shared_state->sink_deps.front()->block();
}
// Notify downstream pipeline tasks this dependency is blocked.
@@ -172,7 +174,6 @@ protected:
BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
- std::list<std::shared_ptr<Dependency>> _children;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
@@ -322,31 +323,6 @@ protected:
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
};
-struct EmptySharedState final : public BasicSharedState {};
-
-struct AndSharedState final : public BasicSharedState {};
-
-class AndDependency final : public Dependency {
-public:
- using SharedState = AndSharedState;
- ENABLE_FACTORY_CREATOR(AndDependency);
- AndDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "AndDependency", query_ctx) {}
-
- std::string debug_string(int indentation_level = 0) override;
-
- void add_child(std::shared_ptr<Dependency> child) override {
_children.push_back(child); }
-
- [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
- for (auto& child : Dependency::_children) {
- if (auto* dep = child->is_blocked_by(task)) {
- return dep;
- }
- }
- return nullptr;
- }
-};
-
struct AggSharedState : public BasicSharedState {
public:
AggSharedState() {
@@ -661,25 +637,28 @@ public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
std::unique_ptr<Exchanger> exchanger {};
- std::vector<DependencySPtr> source_dependencies;
- DependencySPtr sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
+ void create_source_dependencies(int operator_id, int node_id,
QueryContext* ctx) {
+ for (size_t i = 0; i < source_deps.size(); i++) {
+ source_deps[i] = std::make_shared<Dependency>(
+ operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
+ source_deps[i]->set_shared_state(this);
+ }
+ };
void sub_running_sink_operators();
void _set_always_ready() {
- for (auto& dep : source_dependencies) {
+ for (auto& dep : source_deps) {
DCHECK(dep);
dep->set_always_ready();
}
}
- void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
- source_dependencies[channel_id] = dep;
- }
+ Dependency* get_dep_by_channel_id(int channel_id) { return
source_deps[channel_id].get(); }
void set_ready_to_read(int channel_id) {
- auto& dep = source_dependencies[channel_id];
+ auto& dep = source_deps[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();
}
@@ -700,13 +679,13 @@ public:
void add_total_mem_usage(size_t delta) {
if (mem_usage.fetch_add(delta) >
config::local_exchange_buffer_mem_limit) {
- sink_dependency->block();
+ sink_deps.front()->block();
}
}
void sub_total_mem_usage(size_t delta) {
if (mem_usage.fetch_sub(delta) <=
config::local_exchange_buffer_mem_limit) {
- sink_dependency->set_ready();
+ sink_deps.front()->set_ready();
}
}
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 568871835c6..71a5a6b3c13 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -26,7 +26,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
- _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 433e4b48654..30ff16dde80 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -267,80 +267,24 @@ Status
DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
return Status::OK();
}
-template <typename SharedStateType>
-constexpr bool NeedToCreate = true;
-template <>
-inline constexpr bool NeedToCreate<MultiCastSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<SetSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<UnionSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
-
template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(
- vector<DependencySPtr>& dependency,
- std::map<int, std::shared_ptr<BasicSharedState>>& shared_states,
QueryContext* ctx) {
- std::shared_ptr<BasicSharedState> ss = nullptr;
- if constexpr (NeedToCreate<typename LocalStateType::SharedStateType>) {
- ss.reset(new typename LocalStateType::SharedStateType());
- DCHECK(!shared_states.contains(dests_id().front()));
- if constexpr (!std::is_same_v<typename
LocalStateType::SharedStateType, FakeSharedState>) {
- shared_states.insert({dests_id().front(), ss});
- }
+std::shared_ptr<BasicSharedState>
DataSinkOperatorX<LocalStateType>::create_shared_state() const {
+ if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
+ LocalExchangeSharedState>) {
+ return nullptr;
} else if constexpr (std::is_same_v<typename
LocalStateType::SharedStateType,
MultiCastSharedState>) {
- ss =
((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer();
- auto& dests = dests_id();
- for (auto& dest_id : dests) {
- DCHECK(!shared_states.contains(dest_id));
- shared_states.insert({dest_id, ss});
- }
- }
- if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
AndSharedState>) {
- auto& dests = dests_id();
- for (auto& dest_id : dests) {
- dependency.push_back(std::make_shared<AndDependency>(dest_id,
_node_id, ctx));
- dependency.back()->set_shared_state(ss.get());
- }
- } else if constexpr (!std::is_same_v<typename
LocalStateType::SharedStateType,
- FakeSharedState>) {
- auto& dests = dests_id();
- for (auto& dest_id : dests) {
- dependency.push_back(std::make_shared<Dependency>(dest_id,
_node_id,
- _name +
"_DEPENDENCY", true, ctx));
- dependency.back()->set_shared_state(ss.get());
- }
+ LOG(FATAL) << "should not reach here!";
+ return nullptr;
} else {
- dependency.push_back(nullptr);
- }
-}
-
-template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency(
- QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>&
shared_states) {
- std::shared_ptr<BasicSharedState> ss = nullptr;
- if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
SetSharedState>) {
+ std::shared_ptr<BasicSharedState> ss = nullptr;
ss.reset(new typename LocalStateType::SharedStateType());
- shared_states.insert({operator_id(), ss});
- } else if constexpr (std::is_same_v<typename
LocalStateType::SharedStateType,
- UnionSharedState>) {
- ss.reset(new typename LocalStateType::SharedStateType(
- ((UnionSourceOperatorX*)this)->get_child_count()));
- shared_states.insert({operator_id(), ss});
- }
- DependencySPtr dep = nullptr;
- if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
AndSharedState>) {
- dep = std::make_shared<AndDependency>(_operator_id, _node_id, ctx);
- } else if constexpr (std::is_same_v<typename
LocalStateType::SharedStateType,
- FakeSharedState>) {
- dep = std::make_shared<FakeDependency>(_operator_id, _node_id, ctx);
- } else {
- dep = std::make_shared<Dependency>(_operator_id, _node_id, _op_name +
"_DEPENDENCY", ctx);
- dep->set_shared_state(ss.get());
+ ss->id = operator_id();
+ for (auto& dest : dests_id()) {
+ ss->related_op_ids.insert(dest);
+ }
+ return ss;
}
- return dep;
}
template <typename LocalStateType>
@@ -373,25 +317,22 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg,
FakeSharedState>;
- _dependency = info.dependency.get();
if constexpr (!is_fake_shared) {
- _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
- _runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
- auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedStateArg>) {
-
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get());
- _shared_state = _dependency->shared_state()->template
cast<SharedStateArg>();
-
- _shared_state->source_dep = info.dependency;
- } else if constexpr (!std::is_same_v<SharedStateArg, EmptySharedState>
&&
- !std::is_same_v<SharedStateArg, AndSharedState>) {
- _dependency->set_shared_state(info.shared_state);
- _shared_state = _dependency->shared_state()->template
cast<SharedStateArg>();
-
- _shared_state->source_dep = info.dependency;
- if (!deps.empty()) {
- _shared_state->sink_dep = deps.front();
- }
+ _shared_state =
info.le_state_map[_parent->operator_id()].first.get();
+
+ _dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
+ } else if (info.shared_state) {
+ // For UnionSourceOperator without children, there is no shared
state.
+ _shared_state = info.shared_state->template cast<SharedStateArg>();
+
+ _dependency = _shared_state->create_source_dependency(
+ _parent->operator_id(), _parent->node_id(),
_parent->get_name(),
+ state->get_query_ctx());
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
}
}
@@ -445,20 +386,19 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_wait_for_finish_dependency_timer = ADD_TIMER(_profile,
"PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState,
FakeSharedState>;
if constexpr (!is_fake_shared) {
- auto& deps = info.dependencies;
- _dependency = deps.front().get();
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
_dependency =
info.le_state_map[_parent->dests_id().front()].second.get();
- }
- if (_dependency) {
_shared_state = (SharedState*)_dependency->shared_state();
- _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
- _profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
+ } else {
+ _shared_state = info.shared_state->template cast<SharedState>();
+ _dependency = _shared_state->create_sink_dependency(
+ _parent->dests_id().front(), _parent->node_id(),
_parent->get_name(),
+ state->get_query_ctx());
}
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
} else {
- auto& deps = info.dependencies;
- deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
- _dependency = deps.front().get();
+ _dependency = nullptr;
}
_rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows",
TUnit::UNIT, 1);
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
@@ -656,7 +596,6 @@ template class PipelineXSinkLocalState<UnionSharedState>;
template class PipelineXSinkLocalState<PartitionSortNodeSharedState>;
template class PipelineXSinkLocalState<MultiCastSharedState>;
template class PipelineXSinkLocalState<SetSharedState>;
-template class PipelineXSinkLocalState<AndSharedState>;
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
template class PipelineXSinkLocalState<BasicSharedState>;
@@ -671,8 +610,6 @@ template class PipelineXLocalState<MultiCastSharedState>;
template class PipelineXLocalState<PartitionSortNodeSharedState>;
template class PipelineXLocalState<SetSharedState>;
template class PipelineXLocalState<LocalExchangeSharedState>;
-template class PipelineXLocalState<EmptySharedState>;
-template class PipelineXLocalState<AndSharedState>;
template class PipelineXLocalState<BasicSharedState>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 5d1268e50c9..b9c02a935a6 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -31,13 +31,10 @@ namespace doris::pipeline {
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams> scan_ranges;
- std::vector<DependencySPtr>& upstream_dependencies;
BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
le_state_map;
const int task_idx;
-
- DependencySPtr dependency;
};
// This struct is used only for initializing local sink state.
@@ -45,7 +42,7 @@ struct LocalSinkStateInfo {
const int task_idx;
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
- std::vector<DependencySPtr>& dependencies;
+ BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
le_state_map;
const TDataSink& tsink;
@@ -100,7 +97,7 @@ public:
[[nodiscard]] virtual std::string debug_string(int indentation_level = 0)
const = 0;
- virtual Dependency* dependency() { return nullptr; }
+ virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
@@ -184,8 +181,6 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- [[nodiscard]] virtual DependencySPtr get_dependency(
- QueryContext* ctx, std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states) = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -348,10 +343,6 @@ public:
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
return state->get_local_state(operator_id())->template
cast<LocalState>();
}
-
- DependencySPtr get_dependency(
- QueryContext* ctx,
- std::map<int, std::shared_ptr<BasicSharedState>>& shared_states)
override;
};
template <typename SharedStateArg = FakeSharedState>
@@ -372,7 +363,9 @@ public:
[[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
- Dependency* dependency() override { return _dependency; }
+ std::vector<Dependency*> dependencies() const override {
+ return _dependency ? std::vector<Dependency*> {_dependency} :
std::vector<Dependency*> {};
+ }
protected:
Dependency* _dependency = nullptr;
@@ -422,7 +415,7 @@ public:
RuntimeProfile::Counter* rows_input_counter() { return
_rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
- virtual Dependency* dependency() { return nullptr; }
+ virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
@@ -513,9 +506,7 @@ public:
return reinterpret_cast<const TARGET&>(*this);
}
- virtual void get_dependency(std::vector<DependencySPtr>& dependency,
- std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states,
- QueryContext* ctx) = 0;
+ [[nodiscard]] virtual std::shared_ptr<BasicSharedState>
create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -612,9 +603,7 @@ public:
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
- void get_dependency(std::vector<DependencySPtr>& dependency,
- std::map<int, std::shared_ptr<BasicSharedState>>&
shared_states,
- QueryContext* ctx) override;
+ std::shared_ptr<BasicSharedState> create_shared_state() const override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
@@ -640,7 +629,9 @@ public:
virtual std::string name_suffix() { return " (id=" +
std::to_string(_parent->node_id()) + ")"; }
- Dependency* dependency() override { return _dependency; }
+ std::vector<Dependency*> dependencies() const override {
+ return _dependency ? std::vector<Dependency*> {_dependency} :
std::vector<Dependency*> {};
+ }
protected:
Dependency* _dependency = nullptr;
@@ -717,7 +708,9 @@ public:
Status sink(RuntimeState* state, vectorized::Block* block, bool eos);
- Dependency* dependency() override { return _async_writer_dependency.get();
}
+ std::vector<Dependency*> dependencies() const override {
+ return {_async_writer_dependency.get()};
+ }
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f13cf37b1fb..0d1d7784f88 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -610,9 +610,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto& deps = _dag[_pipeline->id()];
for (auto& dep : deps) {
if (pipeline_id_to_task.contains(dep)) {
- task->add_upstream_dependency(
-
pipeline_id_to_task[dep]->get_downstream_dependency(),
-
pipeline_id_to_task[dep]->get_shared_states());
+ auto ss =
pipeline_id_to_task[dep]->get_sink_shared_state();
+ if (ss) {
+ task->inject_shared_state(ss);
+ } else {
+ pipeline_id_to_task[dep]->inject_shared_state(
+ task->get_source_shared_state());
+ }
}
}
}
@@ -781,7 +785,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
_runtime_state->get_query_ctx());
sink_dep->set_shared_state(shared_state.get());
- shared_state->sink_dependency = sink_dep;
+ shared_state->sink_deps.push_back(sink_dep);
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
// 3. Set two pipelines' operator list. For example, split pipeline [Scan
- AggSink] to
@@ -804,6 +808,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);
+ shared_state->create_source_dependencies(source_op->operator_id(),
source_op->node_id(),
+ _query_ctx.get());
+
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
std::vector<PipelineId> edges_with_source;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 9630484f443..1cbf4e4940e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -191,8 +191,6 @@ private:
#pragma clang diagnostic pop
#endif
- std::atomic_bool _canceled = false;
-
// `_dag` manage dependencies between pipelines by pipeline ID. the
indices will be blocked by members
std::map<PipelineId, std::vector<PipelineId>> _dag;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7c0e81c994f..0459f1e3a0e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -60,10 +60,10 @@ PipelineXTask::PipelineXTask(
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
- _sink->get_dependency(_downstream_dependency, _shared_states,
state->get_query_ctx());
- for (auto& op : _operators) {
- _source_dependency.insert(
- {op->operator_id(), op->get_dependency(state->get_query_ctx(),
_shared_states)});
+
+ auto shared_state = _sink->create_shared_state();
+ if (shared_state) {
+ _sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}
@@ -82,7 +82,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams&
local_params, const
LocalSinkStateInfo info {_task_idx,
_task_profile.get(),
local_params.sender_id,
- get_downstream_dependency(),
+ get_sink_shared_state().get(),
_le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
@@ -97,14 +97,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams&
local_params, const
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
- auto& deps = get_upstream_dependency(op->operator_id());
- LocalStateInfo info {parent_profile,
- scan_ranges,
- deps,
- get_shared_state(op->operator_id()),
- _le_state_map,
- _task_idx,
- _source_dependency[op->operator_id()]};
+ LocalStateInfo info {parent_profile, scan_ranges,
get_op_shared_state(op->operator_id()),
+ _le_state_map, _task_idx};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
query_ctx->register_query_statistics(
@@ -126,9 +120,9 @@ Status PipelineXTask::_extract_dependencies() {
return result.error();
}
auto* local_state = result.value();
- auto* dep = local_state->dependency();
- DCHECK(dep != nullptr);
- _read_dependencies.push_back(dep);
+ const auto& deps = local_state->dependencies();
+ std::copy(deps.begin(), deps.end(),
+ std::inserter(_read_dependencies, _read_dependencies.end()));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
@@ -136,9 +130,9 @@ Status PipelineXTask::_extract_dependencies() {
}
{
auto* local_state = _state->get_sink_local_state();
- auto* dep = local_state->dependency();
- DCHECK(dep != nullptr);
- _write_dependencies = dep;
+ _write_dependencies = local_state->dependencies();
+ DCHECK(std::all_of(_write_dependencies.begin(),
_write_dependencies.end(),
+ [](auto* dep) { return dep->is_write_dependency();
}));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
@@ -302,10 +296,8 @@ void PipelineXTask::finalize() {
PipelineTask::finalize();
std::unique_lock<std::mutex> lc(_release_lock);
_finished = true;
- std::vector<DependencySPtr> {}.swap(_downstream_dependency);
- _upstream_dependency.clear();
- _source_dependency.clear();
- _shared_states.clear();
+ _sink_shared_state.reset();
+ _op_shared_states.clear();
_le_state_map.clear();
}
@@ -372,8 +364,10 @@ std::string PipelineXTask::debug_string() {
}
fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
- fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_write_dependencies->debug_string(1));
- i++;
+ for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+ _write_dependencies[j]->debug_string(i + 1));
+ }
if (_filter_dependency) {
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 6e8a3773751..8707de4b54c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -94,41 +94,36 @@ public:
bool is_pending_finish() override { return _finish_blocked_dependency() !=
nullptr; }
- std::vector<DependencySPtr>& get_downstream_dependency() { return
_downstream_dependency; }
- std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() {
return _shared_states; }
-
- void add_upstream_dependency(std::vector<DependencySPtr>&
multi_upstream_dependency,
- std::map<int,
std::shared_ptr<BasicSharedState>>& shared_states) {
- for (auto dep : multi_upstream_dependency) {
- int dst_id = dep->id();
- if (!_upstream_dependency.contains(dst_id)) {
- _upstream_dependency.insert({dst_id, {dep}});
- } else {
- _upstream_dependency[dst_id].push_back(dep);
- }
+ std::shared_ptr<BasicSharedState> get_source_shared_state() {
+ return _op_shared_states.contains(_source->operator_id())
+ ? _op_shared_states[_source->operator_id()]
+ : nullptr;
+ }
- if (shared_states.contains(dst_id) &&
!_shared_states.contains(dst_id)) {
- // Shared state is created by upstream task's sink operator
and shared by source operator of this task.
- _shared_states.insert({dst_id, shared_states[dst_id]});
- } else if (_shared_states.contains(dst_id) &&
!shared_states.contains(dst_id)) {
- // Shared state is created by this task's source operator and
shared by upstream task's sink operator.
- shared_states.insert({dst_id, _shared_states[dst_id]});
+ void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
+ if (!shared_state) {
+ return;
+ }
+ // Shared state is created by upstream task's sink operator and shared
by source operator of this task.
+ for (auto& op : _operators) {
+ if (shared_state->related_op_ids.contains(op->operator_id())) {
+ _op_shared_states.insert({op->operator_id(), shared_state});
+ return;
}
}
- }
-
- std::vector<DependencySPtr>& get_upstream_dependency(int id) {
- if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
- _upstream_dependency.insert({id, {}});
+ if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
+ DCHECK(_sink_shared_state == nullptr);
+ _sink_shared_state = shared_state;
}
- return _upstream_dependency[id];
}
- BasicSharedState* get_shared_state(int id) {
- if (!_shared_states.contains(id)) {
+ std::shared_ptr<BasicSharedState> get_sink_shared_state() { return
_sink_shared_state; }
+
+ BasicSharedState* get_op_shared_state(int id) {
+ if (!_op_shared_states.contains(id)) {
return nullptr;
}
- return _shared_states[id].get();
+ return _op_shared_states[id].get();
}
bool is_pipelineX() const override { return true; }
@@ -161,10 +156,12 @@ public:
private:
Dependency* _write_blocked_dependency() {
- _blocked_dep = _write_dependencies->is_blocked_by(this);
- if (_blocked_dep != nullptr) {
- static_cast<Dependency*>(_blocked_dep)->start_watcher();
- return _blocked_dep;
+ for (auto* op_dep : _write_dependencies) {
+ _blocked_dep = op_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return _blocked_dep;
+ }
}
return nullptr;
}
@@ -203,18 +200,13 @@ private:
DataSinkOperatorXPtr _sink;
std::vector<Dependency*> _read_dependencies;
- Dependency* _write_dependencies;
+ std::vector<Dependency*> _write_dependencies;
std::vector<Dependency*> _finish_dependencies;
RuntimeFilterDependency* _filter_dependency;
- // Write dependencies of upstream pipeline tasks.
- DependencyMap _upstream_dependency;
- // Read dependencies of this pipeline task.
- std::map<int, DependencySPtr> _source_dependency;
- // Write dependencies of this pipeline tasks.
- std::vector<DependencySPtr> _downstream_dependency;
// All shared states of this pipeline task.
- std::map<int, std::shared_ptr<BasicSharedState>> _shared_states;
+ std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
+ std::shared_ptr<BasicSharedState> _sink_shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
_le_state_map;
int _task_idx;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]