This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b3f69211b73 [refactor](wal) move group commit load content length to runtime state (#29188) b3f69211b73 is described below commit b3f69211b7313208b74fd00f664b1a08a2bea6f7 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Tue Jan 2 15:53:38 2024 +0800 [refactor](wal) move group commit load content length to runtime state (#29188) --- be/src/http/action/http_stream.cpp | 3 +- be/src/http/action/stream_load.cpp | 3 +- be/src/runtime/group_commit_mgr.cpp | 48 ++--------------------------- be/src/runtime/group_commit_mgr.h | 8 +---- be/src/runtime/plan_fragment_executor.cpp | 3 ++ be/src/runtime/runtime_state.h | 7 ++++- be/src/vec/sink/group_commit_block_sink.cpp | 17 +++++++--- be/src/vec/sink/group_commit_block_sink.h | 1 + gensrc/thrift/PaloInternalService.thrift | 2 ++ 9 files changed, 31 insertions(+), 61 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index a9652707834..b97ce2976eb 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -345,8 +345,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { content_length *= 3; } - RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info( - ctx->id.to_thrift(), content_length)); + ctx->put_result.params.__set_content_length(content_length); } } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 6f6b7ad6b03..88e12e19dca 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -644,8 +644,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { content_length *= 3; } - RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info( - ctx->id.to_thrift(), content_length)); + ctx->put_result.params.__set_content_length(content_length); } } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 69f150b6c60..0c3589edbe0 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -498,33 +498,17 @@ Status LoadBlockQueue::close_wal() { return Status::OK(); } -bool LoadBlockQueue::has_enough_wal_disk_space( - const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id, - bool is_blocks_contain_all_load_data) { - size_t blocks_size = 0; - for (auto block : blocks) { - blocks_size += block->bytes(); - } - size_t content_length = 0; - Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length); - if (st.ok()) { - RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id)); - } else { - return Status::InternalError("can not find load id."); - } - size_t pre_allocated = is_blocks_contain_all_load_data - ? blocks_size - : (blocks_size > content_length ? blocks_size : content_length); +bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) { auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); size_t available_bytes = 0; { - st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes); + Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes); if (!st.ok()) { LOG(WARNING) << "get wal disk available size filed!"; } } if (pre_allocated < available_bytes) { - st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true); + Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true); if (!st.ok()) { LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string(); } @@ -534,30 +518,4 @@ bool LoadBlockQueue::has_enough_wal_disk_space( return false; } } - -Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) { - std::unique_lock l(_load_info_lock); - if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) { - _load_id_to_content_length_map.insert(std::make_pair(load_id, content_length)); - } - return Status::OK(); -} - -Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) { - std::shared_lock l(_load_info_lock); - if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) { - *content_length = _load_id_to_content_length_map[load_id]; - return Status::OK(); - } - return Status::InternalError("can not find load id!"); -} - -Status GroupCommitMgr::remove_load_info(TUniqueId load_id) { - std::unique_lock l(_load_info_lock); - if (_load_id_to_content_length_map.find(load_id) == _load_id_to_content_length_map.end()) { - return Status::InternalError("can not remove load id!"); - } - _load_id_to_content_length_map.erase(load_id); - return Status::OK(); -} } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index bf89aa2aa50..125256535fe 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -66,8 +66,7 @@ public: WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc, int be_exe_version); Status close_wal(); - bool has_enough_wal_disk_space(const std::vector<std::shared_ptr<vectorized::Block>>& blocks, - const TUniqueId& load_id, bool is_blocks_contain_all_load_data); + bool has_enough_wal_disk_space(size_t pre_allocated); // 1s static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; @@ -157,9 +156,6 @@ public: const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version); - Status update_load_info(TUniqueId load_id, size_t content_length); - Status get_load_info(TUniqueId load_id, size_t* content_length); - Status remove_load_info(TUniqueId load_id); private: ExecEnv* _exec_env = nullptr; @@ -170,8 +166,6 @@ private: std::unique_ptr<doris::ThreadPool> _thread_pool; // memory consumption of all tables' load block queues, used for back pressure. std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; - std::shared_mutex _load_info_lock; - std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index dcbf6346fdd..896178946e3 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -149,6 +149,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (request.__isset.wal_id) { _runtime_state->set_wal_id(request.wal_id); } + if (request.__isset.content_length) { + _runtime_state->set_content_length(request.content_length); + } if (request.query_options.__isset.is_report_success) { _is_report_success = request.query_options.is_report_success; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ee515007eb6..1470ec89776 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -255,7 +255,11 @@ public: void set_wal_id(int64_t wal_id) { _wal_id = wal_id; } - int64_t wal_id() { return _wal_id; } + int64_t wal_id() const { return _wal_id; } + + void set_content_length(size_t content_length) { _content_length = content_length; } + + size_t content_length() const { return _content_length; } const std::string& import_label() { return _import_label; } @@ -659,6 +663,7 @@ private: std::string _load_dir; int64_t _load_job_id; int64_t _wal_id = -1; + size_t _content_length = 0; // mini load int64_t _normal_row_number; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 277b9859bd5..75ffa13233e 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -66,8 +66,6 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { Status GroupCommitBlockSink::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSink::prepare(state)); - RETURN_IF_ERROR( - ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(), 0)); _state = state; // profile must add to state's object pool @@ -240,8 +238,8 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { - _group_commit_mode = _load_block_queue->has_enough_wal_disk_space( - _blocks, load_id, is_blocks_contain_all_load_data) + size_t pre_allocated = _pre_allocated(is_blocks_contain_all_load_data); + _group_commit_mode = _load_block_queue->has_enough_wal_disk_space(pre_allocated) ? TGroupCommitMode::ASYNC_MODE : TGroupCommitMode::SYNC_MODE; if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { @@ -265,5 +263,16 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* state, return Status::OK(); } +size_t GroupCommitBlockSink::_pre_allocated(bool is_blocks_contain_all_load_data) { + size_t blocks_size = 0; + for (auto block : _blocks) { + blocks_size += block->bytes(); + } + return is_blocks_contain_all_load_data + ? blocks_size + : (blocks_size > _state->content_length() ? blocks_size + : _state->content_length()); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 3db4bdd31f8..9a57cb594e7 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -48,6 +48,7 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block); Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data); + size_t _pre_allocated(bool is_blocks_contain_all_load_data); vectorized::VExprContextSPtrs _output_vexpr_ctxs; diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e07797b593d..8700a50790f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -477,6 +477,8 @@ struct TExecPlanFragmentParams { 27: optional i32 total_load_streams 28: optional i32 num_local_sink + + 29: optional i64 content_length } struct TExecPlanFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org