This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 29b99add5fa [fix](cloud) provide a conf to enable/disable streamload commit on be (#37855) 29b99add5fa is described below commit 29b99add5fae9efe6fd9f6c49ddbae0339729dcf Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Wed Jul 17 17:32:25 2024 +0800 [fix](cloud) provide a conf to enable/disable streamload commit on be (#37855) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> Co-authored-by: freemandealer <freeman.zhang1...@gmail.com> --- be/src/cloud/cloud_stream_load_executor.cpp | 12 ++++++------ be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index a87f37a5188..1b8167c96eb 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -60,7 +60,10 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { Status st = Status::InternalError<false>("impossible branch reached, " + op_info); if (ctx->txn_operation.compare("commit") == 0) { - if (topt == TxnOpParamType::WITH_TXN_ID) { + if (!config::enable_stream_load_commit_txn_on_be) { + VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; + st = StreamLoadExecutor::operate_txn_2pc(ctx); + } else if (topt == TxnOpParamType::WITH_TXN_ID) { VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info; st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); } else if (topt == TxnOpParamType::WITH_LABEL) { @@ -93,12 +96,9 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { } Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - if (ctx->load_type == TLoadType::ROUTINE_LOAD) { - return StreamLoadExecutor::commit_txn(ctx); - } - // forward to fe to excute commit transaction for MoW table - if (ctx->is_mow_table()) { + if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be || + ctx->load_type == TLoadType::ROUTINE_LOAD) { Status st; int retry_times = 0; while (retry_times < config::mow_stream_load_commit_retry_times) { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c836bd3fb33..06156fd8598 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -541,6 +541,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50"); DEFINE_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800"); +// enable stream load commit txn on BE directly, bypassing FE. Only for cloud. +DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false"); // The buffer size to store stream table function schema info DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 8860e40b7ef..f0d127df0fc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -595,6 +595,8 @@ DECLARE_mInt32(stream_load_record_batch_size); DECLARE_Int32(stream_load_record_expire_time_secs); // time interval to clean expired stream load records DECLARE_mInt64(clean_stream_load_record_interval_secs); +// enable stream load commit txn on BE directly, bypassing FE. Only for cloud. +DECLARE_mBool(enable_stream_load_commit_txn_on_be); // The buffer size to store stream table function schema info DECLARE_Int64(stream_tvf_buffer_size); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org