This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 46457bcecd4 Revert "[Improvement](sink) optimization for parallel result sink (#3… (#36628) 46457bcecd4 is described below commit 46457bcecd4dcd1080310546ddf618ff1b4fb0f5 Author: Pxl <pxl...@qq.com> AuthorDate: Fri Jun 21 09:51:42 2024 +0800 Revert "[Improvement](sink) optimization for parallel result sink (#3… (#36628) …6305)" This reverts commit fdb5891c3eccefad7a354436dfb0eae82da5bd6e. --- be/src/pipeline/exec/result_file_sink_operator.cpp | 5 +- be/src/pipeline/exec/result_file_sink_operator.h | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 13 +- be/src/pipeline/exec/result_sink_operator.h | 2 +- be/src/pipeline/local_exchange/local_exchanger.cpp | 22 +- be/src/runtime/buffer_control_block.cpp | 258 +++++++++++---------- be/src/runtime/buffer_control_block.h | 33 ++- be/src/runtime/result_buffer_mgr.cpp | 6 +- be/src/runtime/result_buffer_mgr.h | 3 +- be/src/runtime/result_writer.h | 2 +- be/src/service/point_query_executor.cpp | 14 +- be/src/service/point_query_executor.h | 2 +- be/src/vec/sink/varrow_flight_result_writer.cpp | 4 +- be/src/vec/sink/varrow_flight_result_writer.h | 2 +- be/src/vec/sink/vmysql_result_writer.cpp | 4 +- be/src/vec/sink/vmysql_result_writer.h | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 2 +- .../sink/writer/iceberg/viceberg_table_writer.cpp | 2 +- .../sink/writer/iceberg/viceberg_table_writer.h | 2 +- be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +- be/src/vec/sink/writer/vfile_result_writer.h | 2 +- be/src/vec/sink/writer/vhive_table_writer.cpp | 2 +- be/src/vec/sink/writer/vhive_table_writer.h | 4 +- be/src/vec/sink/writer/vjdbc_table_writer.cpp | 2 +- be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +- be/src/vec/sink/writer/vmysql_table_writer.cpp | 2 +- be/src/vec/sink/writer/vmysql_table_writer.h | 2 +- be/src/vec/sink/writer/vodbc_table_writer.cpp | 2 +- be/src/vec/sink/writer/vodbc_table_writer.h | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 4 +- be/src/vec/sink/writer/vtablet_writer.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- .../serde/data_type_serde_mysql_test.cpp | 2 +- 34 files changed, 213 insertions(+), 206 deletions(-) diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 029bea7494e..0cd14899f52 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -99,8 +99,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i if (p._is_top_sink) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), - state->batch_size())); + state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout())); // create writer _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( p._file_opts.get(), p._storage_type, state->fragment_instance_id(), @@ -176,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) // close sender, this is normal path end if (_sender) { _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); + RETURN_IF_ERROR(_sender->close(final_status)); } state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 7623dae7fea..4fa31f615ce 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -107,7 +107,7 @@ private: // Owned by the RuntimeState. RowDescriptor _output_row_descriptor; - int _buf_size = 4096; // Allocated from _pool + int _buf_size = 1024; // Allocated from _pool bool _is_top_sink = true; std::string _header; std::string _header_type; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 378fea18eea..24c5162c4f4 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _sender = _parent->cast<ResultSinkOperatorX>()._sender; } else { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), - state->batch_size())); + state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender, + state->execution_timeout())); } - _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); + _sender->set_dependency(_dependency->shared_from_this()); return Status::OK(); } @@ -122,8 +122,7 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) { if (state->query_options().enable_parallel_result_sink) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), - state->batch_size())); + state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout())); } return Status::OK(); } @@ -140,7 +139,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } - RETURN_IF_ERROR(local_state._writer->write(state, *block)); + RETURN_IF_ERROR(local_state._writer->write(*block)); if (_fetch_option.use_two_phase_fetch) { // Block structure may be changed by calling _second_phase_fetch_data(). // So we should clear block in case of unmatched columns @@ -186,7 +185,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_writer) { _sender->update_return_rows(_writer->get_written_rows()); } - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); + RETURN_IF_ERROR(_sender->close(final_status)); } state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 1d2490f486d..0ccb7f4946b 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -104,7 +104,7 @@ struct ResultFileOptions { } }; -constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8; +constexpr int RESULT_SINK_BUFFER_SIZE = 4096; class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> { ENABLE_FACTORY_CREATOR(ResultSinkLocalState); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index a8dc13438c1..51d2c8268e7 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -200,9 +200,8 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - size_t allocated_bytes = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); if (_data_queue[channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->add_mem_usage(channel_id, allocated_bytes); local_state._shared_state->set_ready_to_read(channel_id); } @@ -221,16 +220,25 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_running_sink_operators == 0) { + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + block->swap(next_block); + local_state._shared_state->sub_mem_usage(local_state._channel_id, + block->allocated_bytes()); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } + } else { + *eos = true; + } + } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - } else if (all_finished) { - *eos = true; + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a1a83b22840..8ef23265e3f 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,7 +31,7 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" -#include "pipeline/dependency.h" +#include "pipeline/exec/result_sink_operator.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/thrift_util.h" @@ -85,14 +85,13 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size) +BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_rows(0), _buffer_limit(buffer_size), - _packet_num(0), - _batch_size(batch_size) { + _packet_num(0) { _query_statistics = std::make_unique<QueryStatistics>(); } @@ -104,153 +103,165 @@ Status BufferControlBlock::init() { return Status::OK(); } -Status BufferControlBlock::add_batch(RuntimeState* state, - std::unique_ptr<TFetchDataResult>& result) { - std::unique_lock<std::mutex> l(_lock); +Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) { + { + std::unique_lock<std::mutex> l(_lock); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } + + int num_rows = result->result_batch.rows.size(); + + while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) && + !_is_cancelled) { + _data_removal.wait_for(l, std::chrono::seconds(1)); + } - int num_rows = result->result_batch.rows.size(); - if (_waiting_rpc.empty()) { - // Merge result into batch to reduce rpc times - if (!_fe_result_batch_queue.empty() && - ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < - _buffer_limit) && - !result->eos) { - std::vector<std::string>& back_rows = _fe_result_batch_queue.back()->result_batch.rows; - std::vector<std::string>& result_rows = result->result_batch.rows; - back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), - std::make_move_iterator(result_rows.end())); + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } + + if (_waiting_rpc.empty()) { + // Merge result into batch to reduce rpc times + if (!_fe_result_batch_queue.empty() && + ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < + _buffer_limit) && + !result->eos) { + std::vector<std::string>& back_rows = + _fe_result_batch_queue.back()->result_batch.rows; + std::vector<std::string>& result_rows = result->result_batch.rows; + back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), + std::make_move_iterator(result_rows.end())); + } else { + _fe_result_batch_queue.push_back(std::move(result)); + } + _buffer_rows += num_rows; } else { - _instance_rows_in_queue.emplace_back(); - _fe_result_batch_queue.push_back(std::move(result)); + auto* ctx = _waiting_rpc.front(); + _waiting_rpc.pop_front(); + ctx->on_data(result, _packet_num); + _packet_num++; } - _buffer_rows += num_rows; - _instance_rows[state->fragment_instance_id()] += num_rows; - _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; - } else { - auto* ctx = _waiting_rpc.front(); - _waiting_rpc.pop_front(); - ctx->on_data(result, _packet_num); - _packet_num++; } - _update_dependency(); return Status::OK(); } -Status BufferControlBlock::add_arrow_batch(RuntimeState* state, - std::shared_ptr<arrow::RecordBatch>& result) { - std::unique_lock<std::mutex> l(_lock); +Status BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) { + { + std::unique_lock<std::mutex> l(_lock); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - int num_rows = result->num_rows(); + int num_rows = result->num_rows(); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > _buffer_limit) && + !_is_cancelled) { + _data_removal.wait_for(l, std::chrono::seconds(1)); + } - // TODO: merge RocordBatch, ToStructArray -> Make again + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - _arrow_flight_batch_queue.push_back(std::move(result)); - _buffer_rows += num_rows; - _instance_rows_in_queue.emplace_back(); - _instance_rows[state->fragment_instance_id()] += num_rows; - _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + // TODO: merge RocordBatch, ToStructArray -> Make again + + _arrow_flight_batch_queue.push_back(std::move(result)); + _buffer_rows += num_rows; + _data_arrival.notify_one(); + } _update_dependency(); return Status::OK(); } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { - std::lock_guard<std::mutex> l(_lock); - if (!_status.ok()) { - ctx->on_failure(_status); - _update_dependency(); - return; - } - if (_is_cancelled) { - ctx->on_failure(Status::Cancelled("Cancelled")); - _update_dependency(); - return; - } - if (!_fe_result_batch_queue.empty()) { - // get result - std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front()); - _fe_result_batch_queue.pop_front(); - _buffer_rows -= result->result_batch.rows.size(); - for (auto it : _instance_rows_in_queue.front()) { - _instance_rows[it.first] -= it.second; + { + std::lock_guard<std::mutex> l(_lock); + if (!_status.ok()) { + ctx->on_failure(_status); + _update_dependency(); + return; } - _instance_rows_in_queue.pop_front(); - - ctx->on_data(result, _packet_num); - _packet_num++; - _update_dependency(); - return; - } - if (_is_close) { - ctx->on_close(_packet_num, _query_statistics.get()); - _update_dependency(); - return; + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled("Cancelled")); + _update_dependency(); + return; + } + if (!_fe_result_batch_queue.empty()) { + // get result + std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front()); + _fe_result_batch_queue.pop_front(); + _buffer_rows -= result->result_batch.rows.size(); + _data_removal.notify_one(); + + ctx->on_data(result, _packet_num); + _packet_num++; + _update_dependency(); + return; + } + if (_is_close) { + ctx->on_close(_packet_num, _query_statistics.get()); + _update_dependency(); + return; + } + // no ready data, push ctx to waiting list + _waiting_rpc.push_back(ctx); } - // no ready data, push ctx to waiting list - _waiting_rpc.push_back(ctx); _update_dependency(); } Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) { - std::unique_lock<std::mutex> l(_lock); - if (!_status.ok()) { - return _status; - } - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + { + std::unique_lock<std::mutex> l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { + _data_arrival.wait_for(l, std::chrono::seconds(1)); + } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); - _buffer_rows -= (*result)->num_rows(); - for (auto it : _instance_rows_in_queue.front()) { - _instance_rows[it.first] -= it.second; + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); } - _instance_rows_in_queue.pop_front(); - _packet_num++; - _update_dependency(); - return Status::OK(); - } - // normal path end - if (_is_close) { - _update_dependency(); - return Status::OK(); + if (!_arrow_flight_batch_queue.empty()) { + *result = std::move(_arrow_flight_batch_queue.front()); + _arrow_flight_batch_queue.pop_front(); + _buffer_rows -= (*result)->num_rows(); + _data_removal.notify_one(); + _packet_num++; + _update_dependency(); + return Status::OK(); + } + + // normal path end + if (_is_close) { + _update_dependency(); + return Status::OK(); + } } return Status::InternalError("Abnormal Ending"); } -Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { +Status BufferControlBlock::close(Status exec_status) { std::unique_lock<std::mutex> l(_lock); - auto it = _result_sink_dependencys.find(id); - if (it != _result_sink_dependencys.end()) { - it->second->set_always_ready(); - _result_sink_dependencys.erase(it); - } - if (!_result_sink_dependencys.empty()) { + close_cnt++; + if (close_cnt < _result_sink_dependencys.size()) { return Status::OK(); } _is_close = true; _status = exec_status; + // notify blocked get thread + _data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { @@ -269,6 +280,8 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { void BufferControlBlock::cancel() { std::unique_lock<std::mutex> l(_lock); _is_cancelled = true; + _data_removal.notify_all(); + _data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } @@ -277,25 +290,18 @@ void BufferControlBlock::cancel() { } void BufferControlBlock::set_dependency( - const TUniqueId& id, std::shared_ptr<pipeline::Dependency> result_sink_dependency) { - std::unique_lock<std::mutex> l(_lock); - _result_sink_dependencys[id] = result_sink_dependency; - _update_dependency(); + std::shared_ptr<pipeline::Dependency> result_sink_dependency) { + _result_sink_dependencys.push_back(result_sink_dependency); } void BufferControlBlock::_update_dependency() { - if (_is_cancelled) { - for (auto it : _result_sink_dependencys) { - it.second->set_ready(); + if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) { + for (auto dependency : _result_sink_dependencys) { + dependency->set_ready(); } - return; - } - - for (auto it : _result_sink_dependencys) { - if (_instance_rows[it.first] > _batch_size) { - it.second->block(); - } else { - it.second->set_ready(); + } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled) { + for (auto dependency : _result_sink_dependencys) { + dependency->block(); } } } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 1296f2c606b..c8c240f928a 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -27,16 +27,15 @@ #include <list> #include <memory> #include <mutex> -#include <unordered_map> #include "common/status.h" #include "runtime/query_statistics.h" -#include "runtime/runtime_state.h" -#include "util/hash_util.hpp" -namespace google::protobuf { +namespace google { +namespace protobuf { class Closure; -} // namespace google::protobuf +} +} // namespace google namespace arrow { class RecordBatch; @@ -72,19 +71,19 @@ struct GetResultBatchCtx { // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size); + BufferControlBlock(const TUniqueId& id, int buffer_size); ~BufferControlBlock(); Status init(); - Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& result); - Status add_arrow_batch(RuntimeState* state, std::shared_ptr<arrow::RecordBatch>& result); + Status add_batch(std::unique_ptr<TFetchDataResult>& result); + Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result); void get_batch(GetResultBatchCtx* ctx); Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. - Status close(const TUniqueId& id, Status exec_status); + Status close(Status exec_status); // this is called by RPC, called from coordinator void cancel(); @@ -99,8 +98,7 @@ public: } } - void set_dependency(const TUniqueId& id, - std::shared_ptr<pipeline::Dependency> result_sink_dependency); + void set_dependency(std::shared_ptr<pipeline::Dependency> result_sink_dependency); protected: void _update_dependency(); @@ -123,17 +121,18 @@ protected: // protects all subsequent data in this block std::mutex _lock; + // signal arrival of new batch or the eos/cancelled condition + std::condition_variable _data_arrival; + // signal removal of data by stream consumer + std::condition_variable _data_removal; std::deque<GetResultBatchCtx*> _waiting_rpc; // only used for FE using return rows to check limit std::unique_ptr<QueryStatistics> _query_statistics; - // instance id to dependency - std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>> _result_sink_dependencys; - std::unordered_map<TUniqueId, size_t> _instance_rows; - std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue; - - int _batch_size; + std::atomic_bool _batch_queue_empty = false; + std::vector<std::shared_ptr<pipeline::Dependency>> _result_sink_dependencys; + size_t close_cnt = 0; }; } // namespace doris diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index ccbf0c3ff67..23f440d1909 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -67,8 +67,8 @@ Status ResultBufferMgr::init() { } Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr<BufferControlBlock>* sender, int exec_timout, - int batch_size) { + std::shared_ptr<BufferControlBlock>* sender, + int exec_timout) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr<BufferControlBlock> control_block = nullptr; - control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size, batch_size); + control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size); { std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8bac69c23ac..30b1b61eb7d 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -58,8 +58,7 @@ public: // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr<BufferControlBlock>* sender, int exec_timeout, - int batch_size); + std::shared_ptr<BufferControlBlock>* sender, int exec_timeout); // fetch data result to FE void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index df1b7a808d9..78082956d0e 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -47,7 +47,7 @@ public: [[nodiscard]] bool output_object_data() const { return _output_object_data; } // Write is sync, it will do real IO work. - virtual Status write(RuntimeState* state, vectorized::Block& block) = 0; + virtual Status write(vectorized::Block& block) = 0; void set_output_object_data(bool output_object_data) { _output_object_data = output_object_data; diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 8078467d5ca..d4d20ea5a48 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() { _reusable->get_col_default_values(), _reusable->include_col_uids()); } if (!_reusable->missing_col_uids().empty()) { - if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) { + if (!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) { std::string missing_columns; for (int cid : _reusable->missing_col_uids()) { missing_columns += _tablet->tablet_schema()->column_by_uid(cid).name() + ","; @@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() { } template <typename MysqlWriter> -Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer, vectorized::Block& block, - PTabletKeyLookupResponse* response) { +Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block, + PTabletKeyLookupResponse* response) { block.clear_names(); - RETURN_IF_ERROR(mysql_writer.write(state, block)); + RETURN_IF_ERROR(mysql_writer.write(block)); assert(mysql_writer.results().size() == 1); uint8_t* buf = nullptr; uint32_t len = 0; @@ -508,13 +508,11 @@ Status PointQueryExecutor::_output_data() { if (_binary_row_format) { vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, _reusable->output_exprs(), nullptr); - RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), mysql_writer, - *_result_block, _response)); + RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); } else { vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _reusable->output_exprs(), nullptr); - RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), mysql_writer, - *_result_block, _response)); + RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); } VLOG_DEBUG << "dump block " << _result_block->dump_data(); } else { diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index f374e094806..1bed53891c3 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -98,7 +98,7 @@ public: const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; } - RuntimeState* runtime_state() { return _runtime_state.get(); } + const RuntimeState& runtime_state() const { return *_runtime_state; } private: // caching TupleDescriptor, output_expr, etc... diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index b23d1668465..d646cf66f34 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() { _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); } -Status VArrowFlightResultWriter::write(RuntimeState* state, Block& input_block) { +Status VArrowFlightResultWriter::write(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { @@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(RuntimeState* state, Block& input_block) SCOPED_TIMER(_result_send_timer); // If this is a dry run task, no need to send data block if (!_is_dry_run) { - status = _sinker->add_arrow_batch(state, result); + status = _sinker->add_arrow_batch(result); } if (status.ok()) { _written_rows += num_rows; diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index ab2578421c8..862b074cb35 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -44,7 +44,7 @@ public: Status init(RuntimeState* state) override; - Status write(RuntimeState* state, Block& block) override; + Status write(Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 45941173d4d..804f50f0fc8 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() { } template <bool is_binary_format> -Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) { +Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { @@ -194,7 +194,7 @@ Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i // If this is a dry run task, no need to send data block if (!_is_dry_run) { if (_sinker) { - status = _sinker->add_batch(state, result); + status = _sinker->add_batch(result); } else { _results.push_back(std::move(result)); } diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index 306d062a6be..da3cdcf0690 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -47,7 +47,7 @@ public: Status init(RuntimeState* state) override; - Status write(RuntimeState* state, Block& block) override; + Status write(Block& block) override; Status close(Status status) override; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 42fd8468e86..814d1b754c4 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } auto block = _get_block_from_queue(); - auto status = write(state, *block); + auto status = write(*block); if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); _writer_status.update(status); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index fc8aacdbfa1..2703330406c 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() { return partition_columns; } -Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { +Status VIcebergTableWriter::write(vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); if (block.rows() == 0) { return Status::OK(); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index e2e582e04ad..35e71d1960f 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -54,7 +54,7 @@ public: Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(RuntimeState* state, vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index c897892cbfc..96c4edc82b5 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() { } } -Status VFileResultWriter::write(RuntimeState* state, Block& block) { +Status VFileResultWriter::write(Block& block) { if (block.rows() == 0) { return Status::OK(); } @@ -291,8 +291,7 @@ Status VFileResultWriter::_send_result() { attach_infos.insert(std::make_pair("URL", file_url)); result->result_batch.__set_attached_infos(attach_infos); - RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result), - "failed to send outfile result"); + RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result"); return Status::OK(); } diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 42753a5e261..44b0695505f 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -60,7 +60,7 @@ public: VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status write(RuntimeState* state, Block& block) override; + Status write(Block& block) override; Status close(Status exec_status) override; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index f90c7134ccd..0e64060eb0b 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { return Status::OK(); } -Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { +Status VHiveTableWriter::write(vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); if (block.rows() == 0) { diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 6c8b972f280..4989ba443c7 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter { public: VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - ~VHiveTableWriter() override = default; + ~VHiveTableWriter() = default; Status init_properties(ObjectPool* pool); Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(RuntimeState* state, vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index b7c8d1f78dd..9493bfbf072 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), JdbcConnector(create_connect_param(t_sink)) {} -Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { +Status VJdbcTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index b8216c3bcd6..a683259c992 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -44,7 +44,7 @@ public: return init_to_write(profile); } - Status write(RuntimeState* state, vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status finish(RuntimeState* state) override { return JdbcConnector::finish_trans(); } diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index 45afe8ce019..d9ca6d96f99 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { return Status::OK(); } -Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) { +Status VMysqlTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h b/be/src/vec/sink/writer/vmysql_table_writer.h index 072885b176b..856d0a21ec5 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.h +++ b/be/src/vec/sink/writer/vmysql_table_writer.h @@ -51,7 +51,7 @@ public: // connect to mysql server Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(RuntimeState* state, vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp b/be/src/vec/sink/writer/vodbc_table_writer.cpp index c70bdd4ca19..da068c3d677 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp @@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), ODBCConnector(create_connect_param(t_sink)) {} -Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { +Status VOdbcTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index fa4dc47b77f..687b5106a8b 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -44,7 +44,7 @@ public: return init_to_write(profile); } - Status write(RuntimeState* state, vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status finish(RuntimeState* state) override { return ODBCConnector::finish_trans(); } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index b31796fe724..2151696714b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1368,7 +1368,7 @@ Status VTabletWriter::_send_new_partition_batch() { // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->write(_state, tmp_block)); + RETURN_IF_ERROR(this->write(tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); @@ -1675,7 +1675,7 @@ void VTabletWriter::_generate_index_channels_payloads( } } -Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input_block) { +Status VTabletWriter::write(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index b9fbc4d0873..21d7b1c9f17 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter { public: VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status write(RuntimeState* state, Block& block) override; + Status write(Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 9bd154ce212..3c8dede657f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, return Status::OK(); } -Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { +Status VTabletWriterV2::write(Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() { // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->write(_state, tmp_block)); + RETURN_IF_ERROR(this->write(tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 363dea54c3b..ff31e1552dd 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -106,7 +106,7 @@ public: ~VTabletWriterV2() override; - Status write(RuntimeState* state, Block& block) override; + Status write(Block& block) override; Status open(RuntimeState* state, RuntimeProfile* profile) override; diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp index 97e78f05c54..5ba8af8b81f 100644 --- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp @@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() { // mysql_writer init vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _output_vexpr_ctxs, nullptr); - Status st = mysql_writer.write(&runtime_stat, block); + Status st = mysql_writer.write(block); EXPECT_TRUE(st.ok()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org