This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3b0b440965c [refactor](pipeline) Delete pipeline option (#35943) 3b0b440965c is described below commit 3b0b440965cfd331df1f8c6b02aaa85b02dd73a4 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Jun 6 15:39:47 2024 +0800 [refactor](pipeline) Delete pipeline option (#35943) --- be/src/exprs/runtime_filter.cpp | 59 +++++++--------------- be/src/exprs/runtime_filter.h | 26 +++------- be/src/pipeline/exec/aggregation_sink_operator.cpp | 3 +- .../exec/streaming_aggregation_operator.cpp | 3 +- be/src/pipeline/local_exchange/local_exchanger.cpp | 1 - be/src/pipeline/pipeline_fragment_context.cpp | 4 -- be/src/runtime/fragment_mgr.cpp | 13 +---- be/src/runtime/query_context.cpp | 28 ++++------ be/src/runtime/query_context.h | 7 --- be/src/runtime/runtime_filter_mgr.cpp | 2 - be/src/runtime/runtime_filter_mgr.h | 1 - be/src/runtime/runtime_state.h | 6 --- be/src/vec/runtime/vdata_stream_recvr.cpp | 24 +++------ be/src/vec/runtime/vdata_stream_recvr.h | 1 - be/src/vec/runtime/vsorted_run_merger.cpp | 27 ++++------ be/src/vec/runtime/vsorted_run_merger.h | 4 -- 16 files changed, 55 insertions(+), 154 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index dc999ae137e..2a98be965f6 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1117,7 +1117,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { pfragment_instance_id->set_lo((int64_t)this); merge_filter_request->set_filter_id(_filter_id); - merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec); + merge_filter_request->set_is_pipeline(true); auto column_type = _wrapper->column_type(); merge_filter_request->set_column_type(to_proto(column_type)); merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); @@ -1170,35 +1170,21 @@ bool IRuntimeFilter::await() { int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER ? execution_timeout : runtime_filter_wait_time_ms; - if (_enable_pipeline_exec) { - auto expected = _rf_state_atomic.load(std::memory_order_acquire); - if (expected == RuntimeFilterState::NOT_READY) { - if (!_rf_state_atomic.compare_exchange_strong( - expected, - MonotonicMillis() - registration_time_ >= wait_times_ms - ? RuntimeFilterState::TIME_OUT - : RuntimeFilterState::NOT_READY, - std::memory_order_acq_rel)) { - DCHECK(expected == RuntimeFilterState::READY || - expected == RuntimeFilterState::TIME_OUT); - return (expected == RuntimeFilterState::READY); - } - return false; - } else if (expected == RuntimeFilterState::TIME_OUT) { - return false; - } - } else { - std::unique_lock lock(_inner_mutex); - if (_rf_state != RuntimeFilterState::READY) { - int64_t ms_since_registration = MonotonicMillis() - registration_time_; - int64_t ms_remaining = wait_times_ms - ms_since_registration; - _rf_state = RuntimeFilterState::TIME_OUT; - if (ms_remaining <= 0) { - return false; - } - return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining), - [this] { return _rf_state == RuntimeFilterState::READY; }); + auto expected = _rf_state_atomic.load(std::memory_order_acquire); + if (expected == RuntimeFilterState::NOT_READY) { + if (!_rf_state_atomic.compare_exchange_strong( + expected, + MonotonicMillis() - registration_time_ >= wait_times_ms + ? RuntimeFilterState::TIME_OUT + : RuntimeFilterState::NOT_READY, + std::memory_order_acq_rel)) { + DCHECK(expected == RuntimeFilterState::READY || + expected == RuntimeFilterState::TIME_OUT); + return (expected == RuntimeFilterState::READY); } + return false; + } else if (expected == RuntimeFilterState::TIME_OUT) { + return false; } return true; } @@ -1212,7 +1198,6 @@ void IRuntimeFilter::update_state() { ? execution_timeout : runtime_filter_wait_time_ms; auto expected = _rf_state_atomic.load(std::memory_order_acquire); - DCHECK(_enable_pipeline_exec); // In pipelineX, runtime filters will be ready or timeout before open phase. if (expected == RuntimeFilterState::NOT_READY) { DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); @@ -1234,17 +1219,11 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - if (_enable_pipeline_exec) { - _rf_state_atomic.store(RuntimeFilterState::READY); - if (!_filter_timer.empty()) { - for (auto& timer : _filter_timer) { - timer->call_ready(); - } + _rf_state_atomic.store(RuntimeFilterState::READY); + if (!_filter_timer.empty()) { + for (auto& timer : _filter_timer) { + timer->call_ready(); } - } else { - std::unique_lock lock(_inner_mutex); - _rf_state = RuntimeFilterState::READY; - _inner_cv.notify_all(); } if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 982a6f6eb8b..a78d732b687 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -207,7 +207,6 @@ public: registration_time_(MonotonicMillis()), _wait_infinitely(_state->runtime_filter_wait_infinitely), _rf_wait_time_ms(_state->runtime_filter_wait_time_ms), - _enable_pipeline_exec(_state->enable_pipeline_exec), _runtime_filter_type(get_runtime_filter_type(desc)), _profile( new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", @@ -247,12 +246,10 @@ public: bool has_local_target() const { return _has_local_target; } bool is_ready() const { - return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || - (_enable_pipeline_exec && - _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY); + return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY; } RuntimeFilterState current_state() const { - return _enable_pipeline_exec ? _rf_state_atomic.load(std::memory_order_acquire) : _rf_state; + return _rf_state_atomic.load(std::memory_order_acquire); } bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; } @@ -390,18 +387,11 @@ protected: void _set_push_down(bool push_down) { _is_push_down = push_down; } std::string _get_explain_state_string() const { - if (_enable_pipeline_exec) { - return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY - ? "READY" - : _rf_state_atomic.load(std::memory_order_acquire) == - RuntimeFilterState::TIME_OUT - ? "TIME_OUT" - : "NOT_READY"; - } else { - return _rf_state == RuntimeFilterState::READY ? "READY" - : _rf_state == RuntimeFilterState::TIME_OUT ? "TIME_OUT" - : "NOT_READY"; - } + return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY + ? "READY" + : _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::TIME_OUT + ? "TIME_OUT" + : "NOT_READY"; } RuntimeFilterParamsContext* _state = nullptr; @@ -436,8 +426,6 @@ protected: const bool _wait_infinitely; const int32_t _rf_wait_time_ms; - const bool _enable_pipeline_exec; - std::atomic<bool> _profile_init = false; // runtime filter type RuntimeFilterType _runtime_filter_type; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 41498fd94fa..7a4a9d9c951 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -743,8 +743,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); // In case of : `select * from (select GoodEvent from hits union select CounterID from hits) as h limit 10;` // only union with limit: we can short circuit query the pipeline exec engine. - _can_short_circuit = - tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_x_exec(); + _can_short_circuit = tnode.agg_node.aggregate_functions.empty(); TSortInfo dummy; for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 837a33dc437..40b63783c12 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1148,8 +1148,7 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); // In case of : `select * from (select GoodEvent from hits union select CounterID from hits) as h limit 10;` // only union with limit: we can short circuit query the pipeline exec engine. - _can_short_circuit = - tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_x_exec(); + _can_short_circuit = tnode.agg_node.aggregate_functions.empty(); TSortInfo dummy; for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 980078b8fe8..a7c6446be43 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -311,7 +311,6 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* state, child_block_suppliers.push_back(block_supplier); } RETURN_IF_ERROR(_merger->prepare(child_block_suppliers)); - _merger->set_pipeline_engine_enabled(true); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a85b64c9154..72c06721d89 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -405,7 +405,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks( _runtime_state->runtime_filter_wait_infinitely(); filterparams->runtime_filter_wait_time_ms = _runtime_state->runtime_filter_wait_time_ms(); - filterparams->enable_pipeline_exec = _runtime_state->enable_pipeline_x_exec(); filterparams->execution_timeout = _runtime_state->execution_timeout(); filterparams->exec_env = ExecEnv::GetInstance(); @@ -1648,9 +1647,6 @@ std::string PipelineFragmentContext::debug_string() { std::vector<std::shared_ptr<TRuntimeProfileTree>> PipelineFragmentContext::collect_realtime_profile_x() const { std::vector<std::shared_ptr<TRuntimeProfileTree>> res; - DCHECK(_query_ctx->enable_pipeline_x_exec() == true) - << fmt::format("Query {} calling a pipeline X function, but its pipeline X is disabled", - print_id(this->_query_id)); // we do not have mutex to protect pipeline_id_to_profile // so we need to make sure this funciton is invoked after fragment context diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 41ba68e8cbb..8222b75cfc3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -710,10 +710,6 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<QueryContext> query_ctx; RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx)); SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id); - DCHECK((params.query_options.__isset.enable_pipeline_x_engine && - params.query_options.enable_pipeline_x_engine) || - (params.query_options.__isset.enable_pipeline_engine && - params.query_options.enable_pipeline_engine)); int64_t duration_ns = 0; std::shared_ptr<pipeline::PipelineFragmentContext> context = std::make_shared<pipeline::PipelineFragmentContext>( @@ -866,13 +862,10 @@ void FragmentMgr::cancel_worker() { } for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (auto q_ctx = it->second.lock()) { - if (q_ctx->is_timeout(now) && q_ctx->enable_pipeline_x_exec()) { + if (q_ctx->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); queries_timeout.push_back(it->first); ++it; - } else if (q_ctx->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - it = _query_ctx_map.erase(it); } else { ++it; } @@ -1240,9 +1233,7 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, return Status::NotFound("Query {} not found", print_id(query_id)); } - if (query_context->enable_pipeline_x_exec()) { - *exec_status = query_context->get_realtime_exec_status(); - } + *exec_status = query_context->get_realtime_exec_status(); return Status::OK(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 44bdaa5971a..2dafb8dd3ec 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -393,10 +393,6 @@ std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> QueryContext::_collect_realtime_query_profile() const { std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res; - if (!enable_pipeline_x_exec()) { - return res; - } - for (auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) { if (auto fragment_ctx = fragment_ctx_wptr.lock()) { if (fragment_ctx == nullptr) { @@ -429,25 +425,19 @@ QueryContext::_collect_realtime_query_profile() const { TReportExecStatusParams QueryContext::get_realtime_exec_status() const { TReportExecStatusParams exec_status; - if (enable_pipeline_x_exec()) { - auto realtime_query_profile = _collect_realtime_query_profile(); - std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; + auto realtime_query_profile = _collect_realtime_query_profile(); + std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; - for (auto load_channel_profile : _load_channel_profile_map) { - if (load_channel_profile.second != nullptr) { - load_channel_profiles.push_back(load_channel_profile.second); - } + for (auto load_channel_profile : _load_channel_profile_map) { + if (load_channel_profile.second != nullptr) { + load_channel_profiles.push_back(load_channel_profile.second); } - - exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params( - this->_query_id, std::move(realtime_query_profile), - std::move(load_channel_profiles), /*is_done=*/false); - } else { - auto msg = fmt::format("Query {} is not pipelineX query", print_id(_query_id)); - LOG_ERROR(msg); - DCHECK(false) << msg; } + exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params( + this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles), + /*is_done=*/false); + return exec_status; } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index ee744a89466..4c1ee2cf574 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -167,13 +167,6 @@ public: _query_options.runtime_filter_wait_infinitely; } - bool enable_pipeline_x_exec() const { - return (_query_options.__isset.enable_pipeline_x_engine && - _query_options.enable_pipeline_x_engine) || - (_query_options.__isset.enable_pipeline_engine && - _query_options.enable_pipeline_engine); - } - int be_exec_version() const { if (!_query_options.__isset.be_exec_version) { return 0; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 104d0e342f7..c9812508446 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -500,7 +500,6 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* sta state->get_query_ctx()->obj_pool.add(new RuntimeFilterParamsContext()); params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely(); params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms(); - params->enable_pipeline_exec = state->enable_pipeline_x_exec(); params->execution_timeout = state->execution_timeout(); params->runtime_filter_mgr = state->local_runtime_filter_mgr(); params->exec_env = state->exec_env(); @@ -516,7 +515,6 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* que RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new RuntimeFilterParamsContext()); params->runtime_filter_wait_infinitely = query_ctx->runtime_filter_wait_infinitely(); params->runtime_filter_wait_time_ms = query_ctx->runtime_filter_wait_time_ms(); - params->enable_pipeline_exec = query_ctx->enable_pipeline_x_exec(); params->execution_timeout = query_ctx->execution_timeout(); params->runtime_filter_mgr = query_ctx->runtime_filter_mgr(); params->exec_env = query_ctx->exec_env(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 706e5ae5e31..fb0970a541d 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -282,7 +282,6 @@ struct RuntimeFilterParamsContext { bool runtime_filter_wait_infinitely; int32_t runtime_filter_wait_time_ms; - bool enable_pipeline_exec; int32_t execution_timeout; RuntimeFilterMgr* runtime_filter_mgr; ExecEnv* exec_env; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 2b303603e7a..4df2b0a45a5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -355,12 +355,6 @@ public: } return _query_options.be_exec_version; } - bool enable_pipeline_x_exec() const { - return (_query_options.__isset.enable_pipeline_x_engine && - _query_options.enable_pipeline_x_engine) || - (_query_options.__isset.enable_pipeline_engine && - _query_options.enable_pipeline_engine); - } bool enable_local_shuffle() const { return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 7d0e131f5b7..1eaed2a62e4 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -342,8 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta _row_desc(row_desc), _is_merging(is_merging), _is_closed(false), - _profile(profile), - _enable_pipeline(state->enable_pipeline_x_exec()) { + _profile(profile) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id)); @@ -351,26 +350,18 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; - if (state->enable_pipeline_x_exec()) { - _sender_to_local_channel_dependency.resize(num_queues); - for (size_t i = 0; i < num_queues; i++) { - _sender_to_local_channel_dependency[i] = pipeline::Dependency::create_shared( - _dest_node_id, _dest_node_id, "LocalExchangeChannelDependency", true); - } + _sender_to_local_channel_dependency.resize(num_queues); + for (size_t i = 0; i < num_queues; i++) { + _sender_to_local_channel_dependency[i] = pipeline::Dependency::create_shared( + _dest_node_id, _dest_node_id, "LocalExchangeChannelDependency", true); } _sender_queues.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; _sender_queue_mem_limit = std::max(20480, config::exchg_node_buffer_size_bytes / num_queues); for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = nullptr; - if (_enable_pipeline) { - queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile)); - if (state->enable_pipeline_x_exec()) { - queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]); - } - } else { - queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile)); - } + queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile)); + queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]); _sender_queues.push_back(queue); } @@ -411,7 +402,6 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, _sender_queues[i], std::placeholders::_1, std::placeholders::_2)); } - _merger->set_pipeline_engine_enabled(_enable_pipeline); RETURN_IF_ERROR(_merger->prepare(child_block_suppliers)); return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 310b9ced5dc..e89eb7ba824 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -167,7 +167,6 @@ private: // Number of blocks received RuntimeProfile::Counter* _blocks_produced_counter = nullptr; - bool _enable_pipeline; std::vector<std::shared_ptr<pipeline::Dependency>> _sender_to_local_channel_dependency; }; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 3b17f957deb..ef054190a3b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -119,12 +119,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { while (_offset != 0 && current->block_ptr() != nullptr) { if (_offset >= current->rows - current->pos) { _offset -= (current->rows - current->pos); - if (_pipeline_engine_enabled) { - _pending_cursor = current.impl; - _priority_queue.pop(); - return Status::OK(); - } - has_next_block(current); + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); } else { current->pos += _offset; _offset = 0; @@ -134,12 +131,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { if (current->is_first()) { if (current->block_ptr() != nullptr) { current->block_ptr()->swap(*output_block); - if (_pipeline_engine_enabled) { - _pending_cursor = current.impl; - _priority_queue.pop(); - return Status::OK(); - } - *eos = !has_next_block(current); + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); } else { *eos = true; } @@ -151,12 +145,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { current->pos, current->rows - current->pos); } current->block_ptr()->swap(*output_block); - if (_pipeline_engine_enabled) { - _pending_cursor = current.impl; - _priority_queue.pop(); - return Status::OK(); - } - *eos = !has_next_block(current); + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); } else { *eos = true; } diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 943956d8c38..8dd706cad16 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -62,8 +62,6 @@ public: // Return the next block of sorted rows from this merger. Status get_next(Block* output_block, bool* eos); - void set_pipeline_engine_enabled(bool value) { _pipeline_engine_enabled = value; } - protected: const VExprContextSPtrs _ordering_expr; SortDescription _desc; @@ -76,8 +74,6 @@ protected: int64_t _limit = -1; size_t _offset = 0; - bool _pipeline_engine_enabled = false; - std::vector<BlockSupplierSortCursorImpl> _cursors; std::priority_queue<MergeSortCursor> _priority_queue; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org