This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b99add5fa [fix](cloud) provide a conf to enable/disable streamload 
commit on be  (#37855)
29b99add5fa is described below

commit 29b99add5fae9efe6fd9f6c49ddbae0339729dcf
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Wed Jul 17 17:32:25 2024 +0800

    [fix](cloud) provide a conf to enable/disable streamload commit on be  
(#37855)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Signed-off-by: freemandealer <freeman.zhang1...@gmail.com>
    Co-authored-by: freemandealer <freeman.zhang1...@gmail.com>
---
 be/src/cloud/cloud_stream_load_executor.cpp | 12 ++++++------
 be/src/common/config.cpp                    |  2 ++
 be/src/common/config.h                      |  2 ++
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/be/src/cloud/cloud_stream_load_executor.cpp 
b/be/src/cloud/cloud_stream_load_executor.cpp
index a87f37a5188..1b8167c96eb 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -60,7 +60,10 @@ Status 
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
     Status st = Status::InternalError<false>("impossible branch reached, " + 
op_info);
 
     if (ctx->txn_operation.compare("commit") == 0) {
-        if (topt == TxnOpParamType::WITH_TXN_ID) {
+        if (!config::enable_stream_load_commit_txn_on_be) {
+            VLOG_DEBUG << "2pc commit stream load txn with FE support: " << 
op_info;
+            st = StreamLoadExecutor::operate_txn_2pc(ctx);
+        } else if (topt == TxnOpParamType::WITH_TXN_ID) {
             VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
             st = 
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
         } else if (topt == TxnOpParamType::WITH_LABEL) {
@@ -93,12 +96,9 @@ Status 
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
 }
 
 Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
-    if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
-        return StreamLoadExecutor::commit_txn(ctx);
-    }
-
     // forward to fe to excute commit transaction for MoW table
-    if (ctx->is_mow_table()) {
+    if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
+        ctx->load_type == TLoadType::ROUTINE_LOAD) {
         Status st;
         int retry_times = 0;
         while (retry_times < config::mow_stream_load_commit_retry_times) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c836bd3fb33..06156fd8598 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -541,6 +541,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50");
 DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
 // The buffer size to store stream table function schema info
 DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8860e40b7ef..f0d127df0fc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -595,6 +595,8 @@ DECLARE_mInt32(stream_load_record_batch_size);
 DECLARE_Int32(stream_load_record_expire_time_secs);
 // time interval to clean expired stream load records
 DECLARE_mInt64(clean_stream_load_record_interval_secs);
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DECLARE_mBool(enable_stream_load_commit_txn_on_be);
 // The buffer size to store stream table function schema info
 DECLARE_Int64(stream_tvf_buffer_size);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to