This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b3f69211b73 [refactor](wal) move group commit load content length to 
runtime state (#29188)
b3f69211b73 is described below

commit b3f69211b7313208b74fd00f664b1a08a2bea6f7
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Tue Jan 2 15:53:38 2024 +0800

    [refactor](wal) move group commit load content length to runtime state 
(#29188)
---
 be/src/http/action/http_stream.cpp          |  3 +-
 be/src/http/action/stream_load.cpp          |  3 +-
 be/src/runtime/group_commit_mgr.cpp         | 48 ++---------------------------
 be/src/runtime/group_commit_mgr.h           |  8 +----
 be/src/runtime/plan_fragment_executor.cpp   |  3 ++
 be/src/runtime/runtime_state.h              |  7 ++++-
 be/src/vec/sink/group_commit_block_sink.cpp | 17 +++++++---
 be/src/vec/sink/group_commit_block_sink.h   |  1 +
 gensrc/thrift/PaloInternalService.thrift    |  2 ++
 9 files changed, 31 insertions(+), 61 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index a9652707834..b97ce2976eb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -345,8 +345,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
                 ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
                 content_length *= 3;
             }
-            
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
-                    ctx->id.to_thrift(), content_length));
+            ctx->put_result.params.__set_content_length(content_length);
         }
     }
 
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 6f6b7ad6b03..88e12e19dca 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -644,8 +644,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
                 ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
                 content_length *= 3;
             }
-            
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
-                    ctx->id.to_thrift(), content_length));
+            ctx->put_result.params.__set_content_length(content_length);
         }
     }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 69f150b6c60..0c3589edbe0 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -498,33 +498,17 @@ Status LoadBlockQueue::close_wal() {
     return Status::OK();
 }
 
-bool LoadBlockQueue::has_enough_wal_disk_space(
-        const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const 
TUniqueId& load_id,
-        bool is_blocks_contain_all_load_data) {
-    size_t blocks_size = 0;
-    for (auto block : blocks) {
-        blocks_size += block->bytes();
-    }
-    size_t content_length = 0;
-    Status st = 
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, 
&content_length);
-    if (st.ok()) {
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
-    } else {
-        return Status::InternalError("can not find load id.");
-    }
-    size_t pre_allocated = is_blocks_contain_all_load_data
-                                   ? blocks_size
-                                   : (blocks_size > content_length ? 
blocks_size : content_length);
+bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
     auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
     size_t available_bytes = 0;
     {
-        st = wal_mgr->get_wal_dir_available_size(wal_base_path, 
&available_bytes);
+        Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, 
&available_bytes);
         if (!st.ok()) {
             LOG(WARNING) << "get wal disk available size filed!";
         }
     }
     if (pre_allocated < available_bytes) {
-        st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, 
pre_allocated, true);
+        Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, 
pre_allocated, true);
         if (!st.ok()) {
             LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << 
st.to_string();
         }
@@ -534,30 +518,4 @@ bool LoadBlockQueue::has_enough_wal_disk_space(
         return false;
     }
 }
-
-Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t 
content_length) {
-    std::unique_lock l(_load_info_lock);
-    if (_load_id_to_content_length_map.find(load_id) == 
_load_id_to_content_length_map.end()) {
-        _load_id_to_content_length_map.insert(std::make_pair(load_id, 
content_length));
-    }
-    return Status::OK();
-}
-
-Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* 
content_length) {
-    std::shared_lock l(_load_info_lock);
-    if (_load_id_to_content_length_map.find(load_id) != 
_load_id_to_content_length_map.end()) {
-        *content_length = _load_id_to_content_length_map[load_id];
-        return Status::OK();
-    }
-    return Status::InternalError("can not find load id!");
-}
-
-Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
-    std::unique_lock l(_load_info_lock);
-    if (_load_id_to_content_length_map.find(load_id) == 
_load_id_to_content_length_map.end()) {
-        return Status::InternalError("can not remove load id!");
-    }
-    _load_id_to_content_length_map.erase(load_id);
-    return Status::OK();
-}
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index bf89aa2aa50..125256535fe 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -66,8 +66,7 @@ public:
                       WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
                       int be_exe_version);
     Status close_wal();
-    bool has_enough_wal_disk_space(const 
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
-                                   const TUniqueId& load_id, bool 
is_blocks_contain_all_load_data);
+    bool has_enough_wal_disk_space(size_t pre_allocated);
 
     // 1s
     static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
@@ -157,9 +156,6 @@ public:
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version);
-    Status update_load_info(TUniqueId load_id, size_t content_length);
-    Status get_load_info(TUniqueId load_id, size_t* content_length);
-    Status remove_load_info(TUniqueId load_id);
 
 private:
     ExecEnv* _exec_env = nullptr;
@@ -170,8 +166,6 @@ private:
     std::unique_ptr<doris::ThreadPool> _thread_pool;
     // memory consumption of all tables' load block queues, used for back 
pressure.
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
-    std::shared_mutex _load_info_lock;
-    std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index dcbf6346fdd..896178946e3 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -149,6 +149,9 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
     if (request.__isset.wal_id) {
         _runtime_state->set_wal_id(request.wal_id);
     }
+    if (request.__isset.content_length) {
+        _runtime_state->set_content_length(request.content_length);
+    }
 
     if (request.query_options.__isset.is_report_success) {
         _is_report_success = request.query_options.is_report_success;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ee515007eb6..1470ec89776 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -255,7 +255,11 @@ public:
 
     void set_wal_id(int64_t wal_id) { _wal_id = wal_id; }
 
-    int64_t wal_id() { return _wal_id; }
+    int64_t wal_id() const { return _wal_id; }
+
+    void set_content_length(size_t content_length) { _content_length = 
content_length; }
+
+    size_t content_length() const { return _content_length; }
 
     const std::string& import_label() { return _import_label; }
 
@@ -659,6 +663,7 @@ private:
     std::string _load_dir;
     int64_t _load_job_id;
     int64_t _wal_id = -1;
+    size_t _content_length = 0;
 
     // mini load
     int64_t _normal_row_number;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 277b9859bd5..75ffa13233e 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -66,8 +66,6 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
 
 Status GroupCommitBlockSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
-    RETURN_IF_ERROR(
-            
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(),
 0));
     _state = state;
 
     // profile must add to state's object pool
@@ -240,8 +238,8 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* 
state,
                     _db_id, _table_id, _base_schema_version, load_id, 
_load_block_queue,
                     _state->be_exec_version()));
             if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
-                _group_commit_mode = 
_load_block_queue->has_enough_wal_disk_space(
-                                             _blocks, load_id, 
is_blocks_contain_all_load_data)
+                size_t pre_allocated = 
_pre_allocated(is_blocks_contain_all_load_data);
+                _group_commit_mode = 
_load_block_queue->has_enough_wal_disk_space(pre_allocated)
                                              ? TGroupCommitMode::ASYNC_MODE
                                              : TGroupCommitMode::SYNC_MODE;
                 if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
@@ -265,5 +263,16 @@ Status GroupCommitBlockSink::_add_blocks(RuntimeState* 
state,
     return Status::OK();
 }
 
+size_t GroupCommitBlockSink::_pre_allocated(bool 
is_blocks_contain_all_load_data) {
+    size_t blocks_size = 0;
+    for (auto block : _blocks) {
+        blocks_size += block->bytes();
+    }
+    return is_blocks_contain_all_load_data
+                   ? blocks_size
+                   : (blocks_size > _state->content_length() ? blocks_size
+                                                             : 
_state->content_length());
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index 3db4bdd31f8..9a57cb594e7 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -48,6 +48,7 @@ public:
 private:
     Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> 
block);
     Status _add_blocks(RuntimeState* state, bool 
is_blocks_contain_all_load_data);
+    size_t _pre_allocated(bool is_blocks_contain_all_load_data);
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e07797b593d..8700a50790f 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -477,6 +477,8 @@ struct TExecPlanFragmentParams {
   27: optional i32 total_load_streams
 
   28: optional i32 num_local_sink
+
+  29: optional i64 content_length
 }
 
 struct TExecPlanFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to