This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b0b440965c [refactor](pipeline) Delete pipeline option (#35943)
3b0b440965c is described below

commit 3b0b440965cfd331df1f8c6b02aaa85b02dd73a4
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Jun 6 15:39:47 2024 +0800

    [refactor](pipeline) Delete pipeline option (#35943)
---
 be/src/exprs/runtime_filter.cpp                    | 59 +++++++---------------
 be/src/exprs/runtime_filter.h                      | 26 +++-------
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  3 +-
 .../exec/streaming_aggregation_operator.cpp        |  3 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  1 -
 be/src/pipeline/pipeline_fragment_context.cpp      |  4 --
 be/src/runtime/fragment_mgr.cpp                    | 13 +----
 be/src/runtime/query_context.cpp                   | 28 ++++------
 be/src/runtime/query_context.h                     |  7 ---
 be/src/runtime/runtime_filter_mgr.cpp              |  2 -
 be/src/runtime/runtime_filter_mgr.h                |  1 -
 be/src/runtime/runtime_state.h                     |  6 ---
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 24 +++------
 be/src/vec/runtime/vdata_stream_recvr.h            |  1 -
 be/src/vec/runtime/vsorted_run_merger.cpp          | 27 ++++------
 be/src/vec/runtime/vsorted_run_merger.h            |  4 --
 16 files changed, 55 insertions(+), 154 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index dc999ae137e..2a98be965f6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1117,7 +1117,7 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr) {
     pfragment_instance_id->set_lo((int64_t)this);
 
     merge_filter_request->set_filter_id(_filter_id);
-    merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
+    merge_filter_request->set_is_pipeline(true);
     auto column_type = _wrapper->column_type();
     merge_filter_request->set_column_type(to_proto(column_type));
     merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
@@ -1170,35 +1170,21 @@ bool IRuntimeFilter::await() {
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
                                     ? execution_timeout
                                     : runtime_filter_wait_time_ms;
-    if (_enable_pipeline_exec) {
-        auto expected = _rf_state_atomic.load(std::memory_order_acquire);
-        if (expected == RuntimeFilterState::NOT_READY) {
-            if (!_rf_state_atomic.compare_exchange_strong(
-                        expected,
-                        MonotonicMillis() - registration_time_ >= wait_times_ms
-                                ? RuntimeFilterState::TIME_OUT
-                                : RuntimeFilterState::NOT_READY,
-                        std::memory_order_acq_rel)) {
-                DCHECK(expected == RuntimeFilterState::READY ||
-                       expected == RuntimeFilterState::TIME_OUT);
-                return (expected == RuntimeFilterState::READY);
-            }
-            return false;
-        } else if (expected == RuntimeFilterState::TIME_OUT) {
-            return false;
-        }
-    } else {
-        std::unique_lock lock(_inner_mutex);
-        if (_rf_state != RuntimeFilterState::READY) {
-            int64_t ms_since_registration = MonotonicMillis() - 
registration_time_;
-            int64_t ms_remaining = wait_times_ms - ms_since_registration;
-            _rf_state = RuntimeFilterState::TIME_OUT;
-            if (ms_remaining <= 0) {
-                return false;
-            }
-            return _inner_cv.wait_for(lock, 
std::chrono::milliseconds(ms_remaining),
-                                      [this] { return _rf_state == 
RuntimeFilterState::READY; });
+    auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+    if (expected == RuntimeFilterState::NOT_READY) {
+        if (!_rf_state_atomic.compare_exchange_strong(
+                    expected,
+                    MonotonicMillis() - registration_time_ >= wait_times_ms
+                            ? RuntimeFilterState::TIME_OUT
+                            : RuntimeFilterState::NOT_READY,
+                    std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY ||
+                   expected == RuntimeFilterState::TIME_OUT);
+            return (expected == RuntimeFilterState::READY);
         }
+        return false;
+    } else if (expected == RuntimeFilterState::TIME_OUT) {
+        return false;
     }
     return true;
 }
@@ -1212,7 +1198,6 @@ void IRuntimeFilter::update_state() {
                                     ? execution_timeout
                                     : runtime_filter_wait_time_ms;
     auto expected = _rf_state_atomic.load(std::memory_order_acquire);
-    DCHECK(_enable_pipeline_exec);
     // In pipelineX, runtime filters will be ready or timeout before open 
phase.
     if (expected == RuntimeFilterState::NOT_READY) {
         DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
@@ -1234,17 +1219,11 @@ PrimitiveType IRuntimeFilter::column_type() const {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
-    if (_enable_pipeline_exec) {
-        _rf_state_atomic.store(RuntimeFilterState::READY);
-        if (!_filter_timer.empty()) {
-            for (auto& timer : _filter_timer) {
-                timer->call_ready();
-            }
+    _rf_state_atomic.store(RuntimeFilterState::READY);
+    if (!_filter_timer.empty()) {
+        for (auto& timer : _filter_timer) {
+            timer->call_ready();
         }
-    } else {
-        std::unique_lock lock(_inner_mutex);
-        _rf_state = RuntimeFilterState::READY;
-        _inner_cv.notify_all();
     }
 
     if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 982a6f6eb8b..a78d732b687 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -207,7 +207,6 @@ public:
               registration_time_(MonotonicMillis()),
               _wait_infinitely(_state->runtime_filter_wait_infinitely),
               _rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
-              _enable_pipeline_exec(_state->enable_pipeline_exec),
               _runtime_filter_type(get_runtime_filter_type(desc)),
               _profile(
                       new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, 
type = {})",
@@ -247,12 +246,10 @@ public:
     bool has_local_target() const { return _has_local_target; }
 
     bool is_ready() const {
-        return (!_enable_pipeline_exec && _rf_state == 
RuntimeFilterState::READY) ||
-               (_enable_pipeline_exec &&
-                _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY);
+        return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY;
     }
     RuntimeFilterState current_state() const {
-        return _enable_pipeline_exec ? 
_rf_state_atomic.load(std::memory_order_acquire) : _rf_state;
+        return _rf_state_atomic.load(std::memory_order_acquire);
     }
 
     bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
@@ -390,18 +387,11 @@ protected:
     void _set_push_down(bool push_down) { _is_push_down = push_down; }
 
     std::string _get_explain_state_string() const {
-        if (_enable_pipeline_exec) {
-            return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY
-                           ? "READY"
-                   : _rf_state_atomic.load(std::memory_order_acquire) ==
-                                   RuntimeFilterState::TIME_OUT
-                           ? "TIME_OUT"
-                           : "NOT_READY";
-        } else {
-            return _rf_state == RuntimeFilterState::READY      ? "READY"
-                   : _rf_state == RuntimeFilterState::TIME_OUT ? "TIME_OUT"
-                                                               : "NOT_READY";
-        }
+        return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY
+                       ? "READY"
+               : _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::TIME_OUT
+                       ? "TIME_OUT"
+                       : "NOT_READY";
     }
 
     RuntimeFilterParamsContext* _state = nullptr;
@@ -436,8 +426,6 @@ protected:
     const bool _wait_infinitely;
     const int32_t _rf_wait_time_ms;
 
-    const bool _enable_pipeline_exec;
-
     std::atomic<bool> _profile_init = false;
     // runtime filter type
     RuntimeFilterType _runtime_filter_type;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 41498fd94fa..7a4a9d9c951 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -743,8 +743,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
     // In case of : `select * from (select GoodEvent from hits union select 
CounterID from hits) as h limit 10;`
     // only union with limit: we can short circuit query the pipeline exec 
engine.
-    _can_short_circuit =
-            tnode.agg_node.aggregate_functions.empty() && 
state->enable_pipeline_x_exec();
+    _can_short_circuit = tnode.agg_node.aggregate_functions.empty();
 
     TSortInfo dummy;
     for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 837a33dc437..40b63783c12 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1148,8 +1148,7 @@ Status StreamingAggOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
     _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
     // In case of : `select * from (select GoodEvent from hits union select 
CounterID from hits) as h limit 10;`
     // only union with limit: we can short circuit query the pipeline exec 
engine.
-    _can_short_circuit =
-            tnode.agg_node.aggregate_functions.empty() && 
state->enable_pipeline_x_exec();
+    _can_short_circuit = tnode.agg_node.aggregate_functions.empty();
 
     TSortInfo dummy;
     for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 980078b8fe8..a7c6446be43 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -311,7 +311,6 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* 
state,
         child_block_suppliers.push_back(block_supplier);
     }
     RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
-    _merger->set_pipeline_engine_enabled(true);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a85b64c9154..72c06721d89 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -405,7 +405,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
                     _runtime_state->runtime_filter_wait_infinitely();
             filterparams->runtime_filter_wait_time_ms =
                     _runtime_state->runtime_filter_wait_time_ms();
-            filterparams->enable_pipeline_exec = 
_runtime_state->enable_pipeline_x_exec();
             filterparams->execution_timeout = 
_runtime_state->execution_timeout();
 
             filterparams->exec_env = ExecEnv::GetInstance();
@@ -1648,9 +1647,6 @@ std::string PipelineFragmentContext::debug_string() {
 std::vector<std::shared_ptr<TRuntimeProfileTree>>
 PipelineFragmentContext::collect_realtime_profile_x() const {
     std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
-    DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
-            << fmt::format("Query {} calling a pipeline X function, but its 
pipeline X is disabled",
-                           print_id(this->_query_id));
 
     // we do not have mutex to protect pipeline_id_to_profile
     // so we need to make sure this funciton is invoked after fragment context
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 41ba68e8cbb..8222b75cfc3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -710,10 +710,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
     SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
-    DCHECK((params.query_options.__isset.enable_pipeline_x_engine &&
-            params.query_options.enable_pipeline_x_engine) ||
-           (params.query_options.__isset.enable_pipeline_engine &&
-            params.query_options.enable_pipeline_engine));
     int64_t duration_ns = 0;
     std::shared_ptr<pipeline::PipelineFragmentContext> context =
             std::make_shared<pipeline::PipelineFragmentContext>(
@@ -866,13 +862,10 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
                 if (auto q_ctx = it->second.lock()) {
-                    if (q_ctx->is_timeout(now) && 
q_ctx->enable_pipeline_x_exec()) {
+                    if (q_ctx->is_timeout(now)) {
                         LOG_WARNING("Query {} is timeout", 
print_id(it->first));
                         queries_timeout.push_back(it->first);
                         ++it;
-                    } else if (q_ctx->is_timeout(now)) {
-                        LOG_WARNING("Query {} is timeout", 
print_id(it->first));
-                        it = _query_ctx_map.erase(it);
                     } else {
                         ++it;
                     }
@@ -1240,9 +1233,7 @@ Status FragmentMgr::get_realtime_exec_status(const 
TUniqueId& query_id,
         return Status::NotFound("Query {} not found", print_id(query_id));
     }
 
-    if (query_context->enable_pipeline_x_exec()) {
-        *exec_status = query_context->get_realtime_exec_status();
-    }
+    *exec_status = query_context->get_realtime_exec_status();
 
     return Status::OK();
 }
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 44bdaa5971a..2dafb8dd3ec 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -393,10 +393,6 @@ std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
 QueryContext::_collect_realtime_query_profile() const {
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
res;
 
-    if (!enable_pipeline_x_exec()) {
-        return res;
-    }
-
     for (auto& [fragment_id, fragment_ctx_wptr] : 
_fragment_id_to_pipeline_ctx) {
         if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
             if (fragment_ctx == nullptr) {
@@ -429,25 +425,19 @@ QueryContext::_collect_realtime_query_profile() const {
 TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
     TReportExecStatusParams exec_status;
 
-    if (enable_pipeline_x_exec()) {
-        auto realtime_query_profile = _collect_realtime_query_profile();
-        std::vector<std::shared_ptr<TRuntimeProfileTree>> 
load_channel_profiles;
+    auto realtime_query_profile = _collect_realtime_query_profile();
+    std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
 
-        for (auto load_channel_profile : _load_channel_profile_map) {
-            if (load_channel_profile.second != nullptr) {
-                load_channel_profiles.push_back(load_channel_profile.second);
-            }
+    for (auto load_channel_profile : _load_channel_profile_map) {
+        if (load_channel_profile.second != nullptr) {
+            load_channel_profiles.push_back(load_channel_profile.second);
         }
-
-        exec_status = 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
-                this->_query_id, std::move(realtime_query_profile),
-                std::move(load_channel_profiles), /*is_done=*/false);
-    } else {
-        auto msg = fmt::format("Query {} is not pipelineX query", 
print_id(_query_id));
-        LOG_ERROR(msg);
-        DCHECK(false) << msg;
     }
 
+    exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+            this->_query_id, std::move(realtime_query_profile), 
std::move(load_channel_profiles),
+            /*is_done=*/false);
+
     return exec_status;
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ee744a89466..4c1ee2cf574 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -167,13 +167,6 @@ public:
                _query_options.runtime_filter_wait_infinitely;
     }
 
-    bool enable_pipeline_x_exec() const {
-        return (_query_options.__isset.enable_pipeline_x_engine &&
-                _query_options.enable_pipeline_x_engine) ||
-               (_query_options.__isset.enable_pipeline_engine &&
-                _query_options.enable_pipeline_engine);
-    }
-
     int be_exec_version() const {
         if (!_query_options.__isset.be_exec_version) {
             return 0;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 104d0e342f7..c9812508446 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -500,7 +500,6 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(RuntimeState* sta
             state->get_query_ctx()->obj_pool.add(new 
RuntimeFilterParamsContext());
     params->runtime_filter_wait_infinitely = 
state->runtime_filter_wait_infinitely();
     params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
-    params->enable_pipeline_exec = state->enable_pipeline_x_exec();
     params->execution_timeout = state->execution_timeout();
     params->runtime_filter_mgr = state->local_runtime_filter_mgr();
     params->exec_env = state->exec_env();
@@ -516,7 +515,6 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(QueryContext* que
     RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new 
RuntimeFilterParamsContext());
     params->runtime_filter_wait_infinitely = 
query_ctx->runtime_filter_wait_infinitely();
     params->runtime_filter_wait_time_ms = 
query_ctx->runtime_filter_wait_time_ms();
-    params->enable_pipeline_exec = query_ctx->enable_pipeline_x_exec();
     params->execution_timeout = query_ctx->execution_timeout();
     params->runtime_filter_mgr = query_ctx->runtime_filter_mgr();
     params->exec_env = query_ctx->exec_env();
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 706e5ae5e31..fb0970a541d 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -282,7 +282,6 @@ struct RuntimeFilterParamsContext {
 
     bool runtime_filter_wait_infinitely;
     int32_t runtime_filter_wait_time_ms;
-    bool enable_pipeline_exec;
     int32_t execution_timeout;
     RuntimeFilterMgr* runtime_filter_mgr;
     ExecEnv* exec_env;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 2b303603e7a..4df2b0a45a5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -355,12 +355,6 @@ public:
         }
         return _query_options.be_exec_version;
     }
-    bool enable_pipeline_x_exec() const {
-        return (_query_options.__isset.enable_pipeline_x_engine &&
-                _query_options.enable_pipeline_x_engine) ||
-               (_query_options.__isset.enable_pipeline_engine &&
-                _query_options.enable_pipeline_engine);
-    }
     bool enable_local_shuffle() const {
         return _query_options.__isset.enable_local_shuffle && 
_query_options.enable_local_shuffle;
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 7d0e131f5b7..1eaed2a62e4 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -342,8 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
           _row_desc(row_desc),
           _is_merging(is_merging),
           _is_closed(false),
-          _profile(profile),
-          _enable_pipeline(state->enable_pipeline_x_exec()) {
+          _profile(profile) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
@@ -351,26 +350,18 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
 
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
-    if (state->enable_pipeline_x_exec()) {
-        _sender_to_local_channel_dependency.resize(num_queues);
-        for (size_t i = 0; i < num_queues; i++) {
-            _sender_to_local_channel_dependency[i] = 
pipeline::Dependency::create_shared(
-                    _dest_node_id, _dest_node_id, 
"LocalExchangeChannelDependency", true);
-        }
+    _sender_to_local_channel_dependency.resize(num_queues);
+    for (size_t i = 0; i < num_queues; i++) {
+        _sender_to_local_channel_dependency[i] = 
pipeline::Dependency::create_shared(
+                _dest_node_id, _dest_node_id, 
"LocalExchangeChannelDependency", true);
     }
     _sender_queues.reserve(num_queues);
     int num_sender_per_queue = is_merging ? 1 : num_senders;
     _sender_queue_mem_limit = std::max(20480, 
config::exchg_node_buffer_size_bytes / num_queues);
     for (int i = 0; i < num_queues; ++i) {
         SenderQueue* queue = nullptr;
-        if (_enable_pipeline) {
-            queue = _sender_queue_pool.add(new PipSenderQueue(this, 
num_sender_per_queue, profile));
-            if (state->enable_pipeline_x_exec()) {
-                
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
-            }
-        } else {
-            queue = _sender_queue_pool.add(new SenderQueue(this, 
num_sender_per_queue, profile));
-        }
+        queue = _sender_queue_pool.add(new PipSenderQueue(this, 
num_sender_per_queue, profile));
+        
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
         _sender_queues.push_back(queue);
     }
 
@@ -411,7 +402,6 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
                                                      _sender_queues[i], 
std::placeholders::_1,
                                                      std::placeholders::_2));
     }
-    _merger->set_pipeline_engine_enabled(_enable_pipeline);
     RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 310b9ced5dc..e89eb7ba824 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -167,7 +167,6 @@ private:
     // Number of blocks received
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
 
-    bool _enable_pipeline;
     std::vector<std::shared_ptr<pipeline::Dependency>> 
_sender_to_local_channel_dependency;
 };
 
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 3b17f957deb..ef054190a3b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -119,12 +119,9 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
         while (_offset != 0 && current->block_ptr() != nullptr) {
             if (_offset >= current->rows - current->pos) {
                 _offset -= (current->rows - current->pos);
-                if (_pipeline_engine_enabled) {
-                    _pending_cursor = current.impl;
-                    _priority_queue.pop();
-                    return Status::OK();
-                }
-                has_next_block(current);
+                _pending_cursor = current.impl;
+                _priority_queue.pop();
+                return Status::OK();
             } else {
                 current->pos += _offset;
                 _offset = 0;
@@ -134,12 +131,9 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
         if (current->is_first()) {
             if (current->block_ptr() != nullptr) {
                 current->block_ptr()->swap(*output_block);
-                if (_pipeline_engine_enabled) {
-                    _pending_cursor = current.impl;
-                    _priority_queue.pop();
-                    return Status::OK();
-                }
-                *eos = !has_next_block(current);
+                _pending_cursor = current.impl;
+                _priority_queue.pop();
+                return Status::OK();
             } else {
                 *eos = true;
             }
@@ -151,12 +145,9 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
                             current->pos, current->rows - current->pos);
                 }
                 current->block_ptr()->swap(*output_block);
-                if (_pipeline_engine_enabled) {
-                    _pending_cursor = current.impl;
-                    _priority_queue.pop();
-                    return Status::OK();
-                }
-                *eos = !has_next_block(current);
+                _pending_cursor = current.impl;
+                _priority_queue.pop();
+                return Status::OK();
             } else {
                 *eos = true;
             }
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 943956d8c38..8dd706cad16 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -62,8 +62,6 @@ public:
     // Return the next block of sorted rows from this merger.
     Status get_next(Block* output_block, bool* eos);
 
-    void set_pipeline_engine_enabled(bool value) { _pipeline_engine_enabled = 
value; }
-
 protected:
     const VExprContextSPtrs _ordering_expr;
     SortDescription _desc;
@@ -76,8 +74,6 @@ protected:
     int64_t _limit = -1;
     size_t _offset = 0;
 
-    bool _pipeline_engine_enabled = false;
-
     std::vector<BlockSupplierSortCursorImpl> _cursors;
     std::priority_queue<MergeSortCursor> _priority_queue;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to