This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a5a18ba39b [improve](txn insert) Txn load support cloud mode (#34721)
0a5a18ba39b is described below

commit 0a5a18ba39b89a658257aa94cc8020dce1a0eb01
Author: meiyi <myime...@gmail.com>
AuthorDate: Wed Jun 5 15:49:24 2024 +0800

    [improve](txn insert) Txn load support cloud mode (#34721)
    
    ## Proposed changes
    
    ### Purpose
    
    The user doc:
    
https://doris.apache.org/zh-CN/docs/dev/data-operate/import/transaction-load-manual
    
    We have supported insert into
    select(https://github.com/apache/doris/pull/31666),
    update(https://github.com/apache/doris/pull/33034) and
    delete(https://github.com/apache/doris/pull/33100) in transaction load.
    
    https://github.com/apache/doris/pull/32980 implements one txn write to
    one partition more than one rowsets.
    
    This pr implements to cloud mode of
    https://github.com/apache/doris/pull/32980
    
    ### Implementation
    
    #### sub_txn_id
    
    see https://github.com/apache/doris/pull/32980
    
    #### Meta service supports commit txn
    
    This process is generally the same as commit_txn, the difference is that
    he partitions version will plus 1 in multi sub txns.
    
    One example:
    Suppose the table, partition, tablet and version info is:
    ```
    --------------------------------------------
    | table | partition | tablet    | version |
    --------------------------------------------
    | t1    | t1_p1     | t1_p1.1   | 1       |
    | t1    | t1_p1     | t1_p1.2   | 1       |
    | t1    | t1_p2     | t1_p2.1   | 2       |
    | t2    | t2_p3     | t2_p3.1   | 3       |
    | t2    | t2_p4     | t2_p4.1   | 4       |
    --------------------------------------------
    ```
    
    Now we commit a txn with 3 sub txns and the tablets are:
     *  sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
     *  sub_txn2: t2_p3.1
     *  sub_txn3: t1_p1.1, t1_p1.2
    
    When commit, the partitions version will be:
     *  sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
     *  sub_txn2: t2_p3(3 -> 4)
     *  sub_txn3: t1_p1(2 -> 3)
    
    After commit, the partitions version will be:
     *  t1: t1_p1(3), t1_p2(3)
     *  t2: t2_p3(4), t2_p4(4)
    
    #### Meta service support generate sub_txn_id by `begin_sub_txn`
---
 cloud/src/common/bvars.cpp                         |   2 +
 cloud/src/common/bvars.h                           |   2 +
 cloud/src/meta-service/meta_service.h              |  24 +
 cloud/src/meta-service/meta_service_txn.cpp        | 965 ++++++++++++++++++++-
 cloud/src/recycler/recycler.cpp                    |   9 +
 cloud/test/meta_service_test.cpp                   | 320 +++++++
 cloud/test/recycler_test.cpp                       | 145 ++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  18 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  20 +
 .../transaction/CloudGlobalTransactionMgr.java     | 144 ++-
 .../apache/doris/transaction/TransactionEntry.java |  83 +-
 .../apache/doris/transaction/TransactionState.java |   6 +-
 gensrc/proto/cloud.proto                           |  48 +
 regression-test/data/insert_p0/txn_insert.out      | 131 +--
 .../data/insert_p0/txn_insert_inject_case.out      |   9 +
 regression-test/suites/insert_p0/txn_insert.groovy | 468 +++++-----
 .../insert_p0/txn_insert_concurrent_insert.groovy  |  26 +-
 .../suites/insert_p0/txn_insert_inject_case.groovy |  69 +-
 .../insert_p0/txn_insert_with_schema_change.groovy |  88 +-
 19 files changed, 2174 insertions(+), 403 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1aa436bb603..43acb47e365 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -27,6 +27,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_commit_txn("ms", 
"commit_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", 
"get_current_max_txn_id");
+BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
+BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", 
"check_txn_conflict");
 BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label");
 BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index b55e1051cd9..e5b50262104 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -126,6 +126,8 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
+extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn;
+extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_version;
 extern BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 4dc4113f341..6ba3d5b45eb 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -61,6 +61,10 @@ public:
     void commit_txn(::google::protobuf::RpcController* controller, const 
CommitTxnRequest* request,
                     CommitTxnResponse* response, ::google::protobuf::Closure* 
done) override;
 
+    void commit_txn_with_sub_txn(::google::protobuf::RpcController* controller,
+                                 const CommitTxnRequest* request, 
CommitTxnResponse* response,
+                                 ::google::protobuf::Closure* done);
+
     void abort_txn(::google::protobuf::RpcController* controller, const 
AbortTxnRequest* request,
                    AbortTxnResponse* response, ::google::protobuf::Closure* 
done) override;
 
@@ -76,6 +80,14 @@ public:
                                 GetCurrentMaxTxnResponse* response,
                                 ::google::protobuf::Closure* done) override;
 
+    void begin_sub_txn(::google::protobuf::RpcController* controller,
+                       const BeginSubTxnRequest* request, BeginSubTxnResponse* 
response,
+                       ::google::protobuf::Closure* done) override;
+
+    void abort_sub_txn(::google::protobuf::RpcController* controller,
+                       const AbortSubTxnRequest* request, AbortSubTxnResponse* 
response,
+                       ::google::protobuf::Closure* done) override;
+
     void check_txn_conflict(::google::protobuf::RpcController* controller,
                             const CheckTxnConflictRequest* request,
                             CheckTxnConflictResponse* response,
@@ -321,6 +333,18 @@ public:
         call_impl(&cloud::MetaService::get_current_max_txn_id, controller, 
request, response, done);
     }
 
+    void begin_sub_txn(::google::protobuf::RpcController* controller,
+                       const BeginSubTxnRequest* request, BeginSubTxnResponse* 
response,
+                       ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::begin_sub_txn, controller, request, 
response, done);
+    }
+
+    void abort_sub_txn(::google::protobuf::RpcController* controller,
+                       const AbortSubTxnRequest* request, AbortSubTxnResponse* 
response,
+                       ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::abort_sub_txn, controller, request, 
response, done);
+    }
+
     void check_txn_conflict(::google::protobuf::RpcController* controller,
                             const CheckTxnConflictRequest* request,
                             CheckTxnConflictResponse* response,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 7866fccaa39..5596cbbf7eb 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -662,6 +662,10 @@ void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
 void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
                                  const CommitTxnRequest* request, 
CommitTxnResponse* response,
                                  ::google::protobuf::Closure* done) {
+    if (request->has_is_txn_load() && request->is_txn_load()) {
+        commit_txn_with_sub_txn(controller, request, response, done);
+        return;
+    }
     RPC_PREPROCESS(commit_txn);
     if (!request->has_txn_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
@@ -1024,15 +1028,585 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_keys[i])
                   << " txn_id=" << txn_id;
 
-        for (auto tablet_id : table_id_tablet_ids[table_id]) {
-            std::string pending_key = 
meta_pending_delete_bitmap_key({instance_id, tablet_id});
-            txn->remove(pending_key);
-            LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" 
<< hex(pending_key)
-                      << " txn_id=" << txn_id;
-        }
+        for (auto tablet_id : table_id_tablet_ids[table_id]) {
+            std::string pending_key = 
meta_pending_delete_bitmap_key({instance_id, tablet_id});
+            txn->remove(pending_key);
+            LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" 
<< hex(pending_key)
+                      << " txn_id=" << txn_id;
+        }
+    }
+    lock_keys.clear();
+    lock_values.clear();
+
+    // Save rowset meta
+    for (auto& i : rowsets) {
+        size_t rowset_size = i.first.size() + i.second.size();
+        txn->put(i.first, i.second);
+        LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" << 
txn_id
+                  << " rowset_size=" << rowset_size;
+    }
+
+    // Save versions
+    for (auto& i : new_versions) {
+        std::string ver_val;
+        VersionPB version_pb;
+        version_pb.set_version(i.second);
+        if (!version_pb.SerializeToString(&ver_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id;
+            msg = ss.str();
+            return;
+        }
+
+        txn->put(i.first, ver_val);
+        LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " 
version:" << i.second
+                  << " txn_id=" << txn_id;
+
+        std::string_view ver_key = i.first;
+        ver_key.remove_prefix(1); // Remove key space
+        // PartitionVersionKeyInfo  {instance_id, db_id, table_id, 
partition_id}
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        int ret = decode_key(&ver_key, &out);
+        if (ret != 0) [[unlikely]] {
+            // decode version key error means this is something wrong,
+            // we can not continue this txn
+            LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << 
hex(ver_key);
+            code = MetaServiceCode::UNDEFINED_ERR;
+            msg = "decode version key error";
+            return;
+        }
+
+        int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
+        int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
+        VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << 
partition_id;
+
+        response->add_table_ids(table_id);
+        response->add_partition_ids(partition_id);
+        response->add_versions(i.second);
+    }
+
+    // Save table versions
+    for (auto& i : table_id_tablet_ids) {
+        std::string ver_key = table_version_key({instance_id, db_id, i.first});
+        txn->atomic_add(ver_key, 1);
+        LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " 
txn_id=" << txn_id;
+    }
+
+    LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+
+    // Update txn_info
+    txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+
+    auto now_time = system_clock::now();
+    uint64_t commit_time = 
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
+    if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
+        code = MetaServiceCode::UNDEFINED_ERR;
+        msg = fmt::format("txn is expired, not allow to commit txn_id={}", 
txn_id);
+        LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
+                  << " timeout_ms=" << txn_info.timeout_ms() << " 
commit_time=" << commit_time;
+        return;
+    }
+    txn_info.set_commit_time(commit_time);
+    txn_info.set_finish_time(commit_time);
+    if (request->has_commit_attachment()) {
+        
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
+    }
+    LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
+    info_val.clear();
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+    txn->put(info_key, info_val);
+    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
+
+    // Update stats of affected tablet
+    std::deque<std::string> kv_pool;
+    std::function<void(const StatsTabletKeyInfo&, const TabletStats&)> 
update_tablet_stats;
+    if (config::split_tablet_stats) {
+        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+            if (stats.num_segs > 0) {
+                auto& data_size_key = kv_pool.emplace_back();
+                stats_tablet_data_size_key(info, &data_size_key);
+                txn->atomic_add(data_size_key, stats.data_size);
+                auto& num_rows_key = kv_pool.emplace_back();
+                stats_tablet_num_rows_key(info, &num_rows_key);
+                txn->atomic_add(num_rows_key, stats.num_rows);
+                auto& num_segs_key = kv_pool.emplace_back();
+                stats_tablet_num_segs_key(info, &num_segs_key);
+                txn->atomic_add(num_segs_key, stats.num_segs);
+            }
+            auto& num_rowsets_key = kv_pool.emplace_back();
+            stats_tablet_num_rowsets_key(info, &num_rowsets_key);
+            txn->atomic_add(num_rowsets_key, stats.num_rowsets);
+        };
+    } else {
+        update_tablet_stats = [&](const StatsTabletKeyInfo& info, const 
TabletStats& stats) {
+            auto& key = kv_pool.emplace_back();
+            stats_tablet_key(info, &key);
+            auto& val = kv_pool.emplace_back();
+            TxnErrorCode err = txn->get(key, &val);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TABLET_NOT_FOUND
+                                                              : 
cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get tablet stats, err={} 
tablet_id={}", err,
+                                  std::get<4>(info));
+                return;
+            }
+            TabletStatsPB stats_pb;
+            if (!stats_pb.ParseFromString(val)) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                msg = fmt::format("malformed tablet stats value, key={}", 
hex(key));
+                return;
+            }
+            stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
+            stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
+            stats_pb.set_num_rowsets(stats_pb.num_rowsets() + 
stats.num_rowsets);
+            stats_pb.set_num_segments(stats_pb.num_segments() + 
stats.num_segs);
+            stats_pb.SerializeToString(&val);
+            txn->put(key, val);
+        };
+    }
+    for (auto& [tablet_id, stats] : tablet_stats) {
+        DCHECK(tablet_ids.count(tablet_id));
+        auto& tablet_idx = tablet_ids[tablet_id];
+        StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                 tablet_idx.partition_id(), tablet_id};
+        update_tablet_stats(info, stats);
+        if (code != MetaServiceCode::OK) return;
+    }
+    // Remove tmp rowset meta
+    for (auto& [k, _] : tmp_rowsets_meta) {
+        txn->remove(k);
+        LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << 
txn_id;
+    }
+
+    const std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
+    LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" 
<< txn_id;
+    txn->remove(running_key);
+
+    std::string recycle_val;
+    std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
+    RecycleTxnPB recycle_pb;
+    recycle_pb.set_creation_time(commit_time);
+    recycle_pb.set_label(txn_info.label());
+
+    if (!recycle_pb.SerializeToString(&recycle_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+    txn->put(recycle_key, recycle_val);
+
+    if (txn_info.load_job_source_type() ==
+        LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
+        put_routine_load_progress(code, msg, instance_id, request, txn.get(), 
db_id);
+    }
+
+    LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << 
" txn_id=" << txn_id;
+    LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
+              << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" 
<< txn->num_del_keys()
+              << " txn_size=" << txn->approximate_bytes() << " txn_id=" << 
txn_id;
+
+    // Finally we are done...
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    // calculate table stats from tablets stats
+    std::map<int64_t /*table_id*/, TableStats> table_stats;
+    std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
+                                         request->base_tablet_ids().end());
+    calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
+    for (const auto& pair : table_stats) {
+        TableStatsPB* stats_pb = response->add_table_stats();
+        auto table_id = pair.first;
+        auto stats = pair.second;
+        get_pb_from_tablestats(stats, stats_pb);
+        stats_pb->set_table_id(table_id);
+        VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
+                   << " table_id=" << table_id
+                   << " updated_row_count=" << stats_pb->updated_row_count();
+    }
+
+    response->mutable_txn_info()->CopyFrom(txn_info);
+} // end commit_txn
+
+/**
+ * This process is generally the same as commit_txn, the difference is that
+ * the partitions version will plus 1 in multi sub txns.
+ *
+ * One example:
+ *  Suppose the table, partition, tablet and version info is:
+ *  --------------------------------------------
+ *  | table | partition | tablet    | version |
+ *  --------------------------------------------
+ *  | t1    | t1_p1     | t1_p1.1   | 1       |
+ *  | t1    | t1_p1     | t1_p1.2   | 1       |
+ *  | t1    | t1_p2     | t1_p2.1   | 2       |
+ *  | t2    | t2_p3     | t2_p3.1   | 3       |
+ *  | t2    | t2_p4     | t2_p4.1   | 4       |
+ *  --------------------------------------------
+ *
+ *  Now we commit a txn with 3 sub txns and the tablets are:
+ *    sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
+ *    sub_txn2: t2_p3.1
+ *    sub_txn3: t1_p1.1, t1_p1.2
+ *  When commit, the partitions version will be:
+ *    sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
+ *    sub_txn2: t2_p3(3 -> 4)
+ *    sub_txn3: t1_p1(2 -> 3)
+ *  After commit, the partitions version will be:
+ *    t1: t1_p1(3), t1_p2(3)
+ *    t2: t2_p3(4), t2_p4(4)
+ */
+void 
MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController* 
controller,
+                                              const CommitTxnRequest* request,
+                                              CommitTxnResponse* response,
+                                              ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(commit_txn);
+    if (!request->has_txn_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid argument, missing txn id";
+        return;
+    }
+
+    int64_t txn_id = request->txn_id();
+    auto sub_txn_infos = request->sub_txn_infos();
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " 
txn_id=" << txn_id;
+        return;
+    }
+
+    RPC_RATE_LIMIT(commit_txn)
+
+    // Create a readonly txn for scan tmp rowset
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    int64_t db_id = index_pb.tablet_index().db_id();
+
+    // Get temporary rowsets involved in the txn
+    std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
+            sub_txn_to_tmp_rowsets_meta;
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        // This is a range scan
+        MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
+        MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
+        std::string rs_tmp_key0;
+        std::string rs_tmp_key1;
+        meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+        meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+        // Get rowset meta that should be commited
+        //                   tmp_rowset_key -> rowset_meta
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> 
tmp_rowsets_meta;
+
+        int num_rowsets = 0;
+        std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+                (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id, 
&sub_txn_id](int*) {
+                    LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
+                              << ", sub_txn_id=" << sub_txn_id << " 
num_rowsets=" << num_rowsets
+                              << " range=[" << hex(rs_tmp_key0) << "," << 
hex(rs_tmp_key1) << ")";
+                });
+
+        std::unique_ptr<RangeGetIterator> it;
+        do {
+            err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+            if (err == TxnErrorCode::TXN_TOO_OLD) {
+                err = txn_kv_->create_txn(&txn);
+                if (err == TxnErrorCode::TXN_OK) {
+                    err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+                }
+            }
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "internal error, failed to get tmp rowset while 
committing, txn_id=" << txn_id
+                   << " err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+
+            while (it->has_next()) {
+                auto [k, v] = it->next();
+                LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " 
txn_id=" << txn_id;
+                tmp_rowsets_meta.emplace_back();
+                if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), 
v.size())) {
+                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                    ss << "malformed rowset meta, unable to initialize, 
txn_id=" << txn_id
+                       << " key=" << hex(k);
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                // Save keys that will be removed later
+                tmp_rowsets_meta.back().first = std::string(k.data(), 
k.size());
+                ++num_rowsets;
+                if (!it->has_next()) rs_tmp_key0 = k;
+            }
+            rs_tmp_key0.push_back('\x00'); // Update to next smallest key for 
iteration
+        } while (it->more());
+
+        VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
+                   << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
+        sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, 
std::move(tmp_rowsets_meta));
+    }
+
+    // Create a read/write txn for guarantee consistency
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // Get txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // TODO: do more check like txn state
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+        code = MetaServiceCode::TXN_ALREADY_ABORTED;
+        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+        code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" 
<< txn_id;
+        msg = ss.str();
+        response->mutable_txn_info()->CopyFrom(txn_info);
+        return;
+    }
+
+    LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+
+    // Prepare rowset meta and new_versions
+    // Read tablet indexes in batch.
+    std::map<int64_t, int64_t> tablet_id_to_idx;
+    std::vector<std::string> tablet_idx_keys;
+    std::vector<int64_t> partition_ids;
+    auto idx = 0;
+    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            auto tablet_id = i.tablet_id();
+            if (tablet_id_to_idx.count(tablet_id) == 0) {
+                tablet_id_to_idx.emplace(tablet_id, idx);
+                tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
+                partition_ids.push_back(i.partition_id());
+                idx++;
+            }
+        }
+    }
+    std::vector<std::optional<std::string>> tablet_idx_values;
+    err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, 
Transaction::BatchGetOptions(false));
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get tablet table index ids, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+
+    // tablet_id -> {table/index/partition}_id
+    std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+    // table_id -> tablets_ids
+    std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+    for (auto [tablet_id, i] : tablet_id_to_idx) {
+        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+            // The value must existed
+            code = MetaServiceCode::KV_TXN_GET_ERR;
+            ss << "failed to get tablet table index ids, err=not found"
+               << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
+            msg = ss.str();
+            LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+            return;
+        }
+        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+        VLOG_DEBUG << "tablet_id:" << tablet_id
+                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
+    }
+
+    tablet_idx_keys.clear();
+    tablet_idx_values.clear();
+
+    // {table/partition} -> version
+    std::unordered_map<std::string, uint64_t> new_versions;
+    std::vector<std::string> version_keys;
+    for (auto& [tablet_id, i] : tablet_id_to_idx) {
+        int64_t table_id = tablet_ids[tablet_id].table_id();
+        int64_t partition_id = partition_ids[i];
+        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
+        if (new_versions.count(ver_key) == 0) {
+            new_versions.insert({ver_key, 0});
+            LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) << 
" txn_id=" << txn_id
+                      << ", db_id=" << db_id << ", table_id=" << table_id
+                      << ", partition_id=" << partition_id;
+            version_keys.push_back(std::move(ver_key));
+        }
+    }
+    std::vector<std::optional<std::string>> version_values;
+    err = txn->batch_get(&version_values, version_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get partition versions, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_versions = version_keys.size();
+    for (size_t i = 0; i < total_versions; i++) {
+        int64_t version;
+        if (version_values[i].has_value()) {
+            VersionPB version_pb;
+            if (!version_pb.ParseFromString(version_values[i].value())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "failed to parse version pb txn_id=" << txn_id
+                   << " key=" << hex(version_keys[i]);
+                msg = ss.str();
+                return;
+            }
+            version = version_pb.version();
+        } else {
+            version = 1;
+        }
+        new_versions[version_keys[i]] = version;
+        LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
+                  << " version:" << version << " txn_id=" << txn_id;
+    }
+    version_keys.clear();
+    version_values.clear();
+
+    std::vector<std::pair<std::string, std::string>> rowsets;
+    std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
+    for (const auto& sub_txn_info : sub_txn_infos) {
+        auto sub_txn_id = sub_txn_info.sub_txn_id();
+        auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
+        std::unordered_map<int64_t, int64_t> partition_id_to_version;
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            int64_t tablet_id = i.tablet_id();
+            int64_t table_id = tablet_ids[tablet_id].table_id();
+            int64_t partition_id = i.partition_id();
+            std::string ver_key =
+                    partition_version_key({instance_id, db_id, table_id, 
partition_id});
+            if (new_versions.count(ver_key) == 0) [[unlikely]] {
+                // it is impossible.
+                code = MetaServiceCode::UNDEFINED_ERR;
+                ss << "failed to get partition version key, the target version 
not exists in "
+                      "new_versions."
+                   << " txn_id=" << txn_id << ", db_id=" << db_id << ", 
table_id=" << table_id
+                   << ", partition_id=" << partition_id;
+                msg = ss.str();
+                LOG(ERROR) << msg;
+                return;
+            }
+
+            // Update rowset version
+            int64_t new_version = new_versions[ver_key];
+            if (partition_id_to_version.count(partition_id) == 0) {
+                new_versions[ver_key] = new_version + 1;
+                new_version = new_versions[ver_key];
+                partition_id_to_version[partition_id] = new_version;
+            }
+            i.set_start_version(new_version);
+            i.set_end_version(new_version);
+            LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
+                      << ", sub_txn_id=" << sub_txn_id << ", table_id=" << 
table_id
+                      << ", partition_id=" << partition_id << ", tablet_id=" 
<< tablet_id
+                      << ", new_version=" << new_version;
+
+            std::string key = meta_rowset_key({instance_id, tablet_id, 
i.end_version()});
+            std::string val;
+            if (!i.SerializeToString(&val)) {
+                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
+                msg = ss.str();
+                return;
+            }
+            rowsets.emplace_back(std::move(key), std::move(val));
+
+            // Accumulate affected rows
+            auto& stats = tablet_stats[tablet_id];
+            stats.data_size += i.data_disk_size();
+            stats.num_rows += i.num_rows();
+            ++stats.num_rowsets;
+            stats.num_segs += i.num_segments();
+        } // for tmp_rowsets_meta
     }
-    lock_keys.clear();
-    lock_values.clear();
 
     // Save rowset meta
     for (auto& i : rowsets) {
@@ -1074,7 +1648,8 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
 
         int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
         int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
-        VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << 
partition_id;
+        VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
+                   << " partition_id=" << partition_id << " version=" << 
i.second;
 
         response->add_table_ids(table_id);
         response->add_partition_ids(partition_id);
@@ -1174,9 +1749,11 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         if (code != MetaServiceCode::OK) return;
     }
     // Remove tmp rowset meta
-    for (auto& [k, _] : tmp_rowsets_meta) {
-        txn->remove(k);
-        LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << 
txn_id;
+    for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (auto& [k, _] : tmp_rowsets_meta) {
+            txn->remove(k);
+            LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" 
<< txn_id;
+        }
     }
 
     const std::string running_key = txn_running_key({instance_id, db_id, 
txn_id});
@@ -1197,11 +1774,6 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     }
     txn->put(recycle_key, recycle_val);
 
-    if (txn_info.load_job_source_type() ==
-        LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
-        put_routine_load_progress(code, msg, instance_id, request, txn.get(), 
db_id);
-    }
-
     LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << 
" txn_id=" << txn_id;
     LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
               << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" 
<< txn->num_del_keys()
@@ -1233,7 +1805,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     }
 
     response->mutable_txn_info()->CopyFrom(txn_info);
-} // end commit_txn
+} // end commit_txn_with_sub_txn
 
 void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
                                 const AbortTxnRequest* request, 
AbortTxnResponse* response,
@@ -1612,6 +2184,327 @@ void 
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
     response->set_current_max_txn_id(current_max_txn_id);
 }
 
+/**
+ * 1. Generate a sub_txn_id
+ *
+ * The following steps are done in a txn:
+ * 2. Put txn_index_key in sub_txn_id
+ * 3. Delete txn_label_key in sub_txn_id
+ * 4. Modify the txn state of the txn_id:
+ *    - Add the sub txn id to sub_txn_ids: recycler use it to recycle the 
txn_index_key
+ *    - Add the table id to table_ids
+ */
+void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const BeginSubTxnRequest* request,
+                                    BeginSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(begin_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() 
: -1;
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    auto& table_ids = request->table_ids();
+    std::string label = request->has_label() ? request->label() : "";
+    if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() || 
label.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" << 
sub_txn_num
+           << " db_id=" << db_id << ", label=" << label << ", table_ids=[";
+        for (auto table_id : table_ids) {
+            ss << table_id << ", ";
+        }
+        ss << "]";
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(begin_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    const std::string label_key = txn_label_key({instance_id, db_id, label});
+    std::string label_val;
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) 
{
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get failed(), err=" << err << " label=" << label;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // err == OK means this is a retry rpc?
+    if (err == TxnErrorCode::TXN_OK) {
+        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
+    }
+
+    // ret > 0, means label not exist previously.
+    txn->atomic_set_ver_value(label_key, label_val);
+    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "txn->commit failed(), label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    // 2. Get sub txn id from version stamp
+    txn.reset();
+    err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn when get txn id, label=" << label << " 
err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    label_val.clear();
+    err = txn->get(label_key, &label_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "txn->get() failed, label=" << label << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label 
<< " err=" << err;
+
+    // Generated by TxnKv system
+    int64_t sub_txn_id = 0;
+    int ret =
+            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
+                                           label_val.size() - 
VERSION_STAMP_LEN, label_val.size()),
+                                   &sub_txn_id);
+    if (ret != 0) {
+        code = MetaServiceCode::TXN_GEN_ID_ERR;
+        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << 
ret;
+        msg = ss.str();
+        return;
+    }
+
+    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" 
<< sub_txn_id
+              << " txn_id=" << txn_id << " label_val.size()=" << 
label_val.size();
+
+    // write txn_index_key
+    const std::string index_key = txn_index_key({instance_id, sub_txn_id});
+    std::string index_val;
+    TxnIndexPB index_pb;
+    if (!index_pb.SerializeToString(&index_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_index_pb "
+           << "label=" << label << " txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    if (txn_info.sub_txn_ids().size() != sub_txn_num) {
+        code = MetaServiceCode::UNDEFINED_ERR;
+        ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected 
sub_txn_num=" << sub_txn_num
+           << ", txn_info.sub_txn_ids=[";
+        for (auto id : txn_info.sub_txn_ids()) {
+            ss << id << ", ";
+        }
+        ss << "]";
+        msg = ss.str();
+        LOG(WARNING) << msg;
+    }
+    txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
+    txn_info.mutable_table_ids()->Clear();
+    for (auto table_id : table_ids) {
+        txn_info.mutable_table_ids()->Add(table_id);
+    }
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->remove(label_key);
+    txn->put(info_key, info_val);
+    txn->put(index_key, index_val);
+    LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
+              << ", remove label_key=" << hex(label_key) << ", put info_key=" 
<< hex(info_key)
+              << ", put index_key=" << hex(index_key);
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+    response->set_sub_txn_id(sub_txn_id);
+    response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
+/**
+ * 1. Modify the txn state of the txn_id:
+ *    - Remove the table id from table_ids
+ */
+void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* 
controller,
+                                    const AbortSubTxnRequest* request,
+                                    AbortSubTxnResponse* response,
+                                    ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(abort_sub_txn);
+    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+    int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : 
-1;
+    int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() 
: -1;
+    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+    auto& table_ids = request->table_ids();
+    if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" << 
sub_txn_id
+           << ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ", 
table_ids=[";
+        for (auto table_id : table_ids) {
+            ss << table_id << ", ";
+        }
+        ss << "]";
+        msg = ss.str();
+        return;
+    }
+
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+
+    RPC_RATE_LIMIT(abort_sub_txn)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
+        msg = ss.str();
+        return;
+    }
+
+    // Get and update txn info with db_id and txn_id
+    std::string info_val; // Will be reused when saving updated txn
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
+           << " sub_txn_id=" << sub_txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
+           << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+    DCHECK(txn_info.txn_id() == txn_id);
+    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+        code = MetaServiceCode::TXN_INVALID_STATUS;
+        ss << "transaction status is " << txn_info.status() << " : db_id=" << 
db_id
+           << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // remove table_id and does not need to remove sub_txn_id
+    if (txn_info.sub_txn_ids().size() != sub_txn_num) {
+        code = MetaServiceCode::UNDEFINED_ERR;
+        ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" << 
sub_txn_id
+           << ", expected sub_txn_num=" << sub_txn_num << ", 
txn_info.sub_txn_ids=[";
+        for (auto id : txn_info.sub_txn_ids()) {
+            ss << id << ", ";
+        }
+        ss << "]";
+        msg = ss.str();
+        LOG(WARNING) << msg;
+    }
+    txn_info.mutable_table_ids()->Clear();
+    for (auto table_id : table_ids) {
+        txn_info.mutable_table_ids()->Add(table_id);
+    }
+    // TODO should we try to delete txn_label_key if begin_sub_txn failed to 
delete?
+
+    if (!txn_info.SerializeToString(&info_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id
+           << " sub_txn_id=" << sub_txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->put(info_key, info_val);
+    LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
+              << ", put info_key=" << hex(info_key);
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id=" 
<< sub_txn_id
+           << ", err=" << err;
+        msg = ss.str();
+        return;
+    }
+    response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
 void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* 
controller,
                                          const CheckTxnConflictRequest* 
request,
                                          CheckTxnConflictResponse* response,
@@ -1686,24 +2579,24 @@ void 
MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
 
             if (running_pb.timeout_time() < check_time) {
                 skip_timeout_txn_cnt++;
-                break;
-            }
-
-            LOG(INFO) << "check watermark conflict range_get txn_run_key=" << 
hex(k)
-                      << " running_pb=" << running_pb.ShortDebugString();
-            std::vector<int64_t> 
running_table_ids(running_pb.table_ids().begin(),
-                                                   
running_pb.table_ids().end());
-            std::sort(running_table_ids.begin(), running_table_ids.end());
-            std::vector<int64_t> result(std::min(running_table_ids.size(), 
src_table_ids.size()));
-            std::vector<int64_t>::iterator iter = std::set_intersection(
-                    src_table_ids.begin(), src_table_ids.end(), 
running_table_ids.begin(),
-                    running_table_ids.end(), result.begin());
-            result.resize(iter - result.begin());
-            if (result.size() > 0) {
-                response->set_finished(false);
-                LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt
-                          << " total iteration count: " << total_iteration_cnt;
-                return;
+            } else {
+                LOG(INFO) << "check watermark conflict range_get txn_run_key=" 
<< hex(k)
+                          << " running_pb=" << running_pb.ShortDebugString();
+                std::vector<int64_t> 
running_table_ids(running_pb.table_ids().begin(),
+                                                       
running_pb.table_ids().end());
+                std::sort(running_table_ids.begin(), running_table_ids.end());
+                std::vector<int64_t> result(
+                        std::min(running_table_ids.size(), 
src_table_ids.size()));
+                std::vector<int64_t>::iterator iter = std::set_intersection(
+                        src_table_ids.begin(), src_table_ids.end(), 
running_table_ids.begin(),
+                        running_table_ids.end(), result.begin());
+                result.resize(iter - result.begin());
+                if (result.size() > 0) {
+                    response->set_finished(false);
+                    LOG(INFO) << "skip timeout txn count: " << 
skip_timeout_txn_cnt
+                              << " total iteration count: " << 
total_iteration_cnt;
+                    return;
+                }
             }
 
             if (!it->has_next()) {
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 05744bc596d..ca4c17b61ff 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1906,6 +1906,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
             return -1;
         }
         txn->remove(info_key);
+        // Remove sub txn index kvs
+        std::vector<std::string> sub_txn_index_keys;
+        for (auto sub_txn_id : txn_info.sub_txn_ids()) {
+            auto sub_txn_index_key = txn_index_key({instance_id_, sub_txn_id});
+            sub_txn_index_keys.push_back(sub_txn_index_key);
+        }
+        for (auto& sub_txn_index_key : sub_txn_index_keys) {
+            txn->remove(sub_txn_index_key);
+        }
         // Update txn label
         std::string label_key, label_val;
         txn_label_key({instance_id_, db_id, txn_info.label()}, &label_key);
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index b2b203f66e0..013433239d3 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -1332,6 +1332,326 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
     }
 }
 
+TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
+    auto meta_service = get_meta_service();
+    int64_t db_id = 98131;
+    int64_t txn_id = -1;
+    int64_t t1 = 10;
+    int64_t t1_index = 100;
+    int64_t t1_p1 = 11;
+    int64_t t1_p1_t1 = 12;
+    int64_t t1_p1_t2 = 13;
+    int64_t t1_p2 = 14;
+    int64_t t1_p2_t1 = 15;
+    int64_t t2 = 16;
+    int64_t t2_index = 101;
+    int64_t t2_p3 = 17;
+    int64_t t2_p3_t1 = 18;
+    [[maybe_unused]] int64_t t2_p4 = 19;
+    [[maybe_unused]] int64_t t2_p4_t1 = 20;
+    // begin txn
+    {
+        brpc::Controller cntl;
+        BeginTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label("test_label");
+        txn_info_pb.add_table_ids(t1);
+        txn_info_pb.set_timeout_ms(36000);
+        req.mutable_txn_info()->CopyFrom(txn_info_pb);
+        BeginTxnResponse res;
+        
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        txn_id = res.txn_id();
+    }
+
+    // mock rowset and tablet: for sub_txn1
+    int64_t sub_txn_id1 = txn_id;
+    {
+        create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
+        auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t1, t1_p1);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+    {
+        create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
+        auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t2, t1_p1);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+    {
+        create_tablet(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1);
+        auto tmp_rowset = create_rowset(sub_txn_id1, t1_p2_t1, t1_p2);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // begin_sub_txn2
+    int64_t sub_txn_id2 = -1;
+    {
+        brpc::Controller cntl;
+        BeginSubTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_txn_id(txn_id);
+        req.set_sub_txn_num(0);
+        req.set_db_id(db_id);
+        req.set_label("test_label_0");
+        req.mutable_table_ids()->Add(t1);
+        req.mutable_table_ids()->Add(t2);
+        BeginSubTxnResponse res;
+        
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+        ASSERT_TRUE(res.has_sub_txn_id());
+        sub_txn_id2 = res.sub_txn_id();
+        ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+    }
+    // mock rowset and tablet: for sub_txn3
+    {
+        create_tablet(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1);
+        auto tmp_rowset = create_rowset(sub_txn_id2, t2_p3_t1, t2_p3);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // begin_sub_txn3
+    int64_t sub_txn_id3 = -1;
+    {
+        brpc::Controller cntl;
+        BeginSubTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_txn_id(txn_id);
+        req.set_sub_txn_num(1);
+        req.set_db_id(db_id);
+        req.set_label("test_label_1");
+        req.mutable_table_ids()->Add(t1);
+        req.mutable_table_ids()->Add(t2);
+        req.mutable_table_ids()->Add(t1);
+        BeginSubTxnResponse res;
+        
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+        ASSERT_TRUE(res.has_sub_txn_id());
+        sub_txn_id3 = res.sub_txn_id();
+        ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+    }
+    // mock rowset and tablet: for sub_txn3
+    {
+        create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
+        auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t1, t1_p1);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+    {
+        create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
+        auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t2, t1_p1);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // commit txn
+    CommitTxnRequest req;
+    {
+        brpc::Controller cntl;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(666);
+        req.set_txn_id(txn_id);
+        req.set_is_txn_load(true);
+
+        SubTxnInfo sub_txn_info1;
+        sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+        sub_txn_info1.set_table_id(t1);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);
+
+        SubTxnInfo sub_txn_info2;
+        sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+        sub_txn_info2.set_table_id(t2);
+        sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);
+
+        SubTxnInfo sub_txn_info3;
+        sub_txn_info3.set_sub_txn_id(sub_txn_id3);
+        sub_txn_info3.set_table_id(t1);
+        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
+        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);
+
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3));
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        // std::cout << res.DebugString() << std::endl;
+        ASSERT_EQ(res.table_ids().size(), 3);
+
+        ASSERT_EQ(res.table_ids()[0], t2);
+        ASSERT_EQ(res.partition_ids()[0], t2_p3);
+        ASSERT_EQ(res.versions()[0], 2);
+
+        ASSERT_EQ(res.table_ids()[1], t1);
+        ASSERT_EQ(res.partition_ids()[1], t1_p2);
+        ASSERT_EQ(res.versions()[1], 2);
+
+        ASSERT_EQ(res.table_ids()[2], t1);
+        ASSERT_EQ(res.partition_ids()[2], t1_p1);
+        ASSERT_EQ(res.versions()[2], 3);
+    }
+
+    // doubly commit txn
+    {
+        brpc::Controller cntl;
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_VISIBLE);
+        auto found = res.status().msg().find(
+                fmt::format("transaction is already visible: db_id={} 
txn_id={}", db_id, txn_id));
+        ASSERT_TRUE(found != std::string::npos);
+    }
+}
+
+TEST(MetaServiceTest, BeginAndAbortSubTxnTest) {
+    auto meta_service = get_meta_service();
+    long db_id = 98762;
+    int64_t txn_id = -1;
+    // begin txn
+    {
+        brpc::Controller cntl;
+        BeginTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label("test_label");
+        txn_info_pb.add_table_ids(1234);
+        txn_info_pb.set_timeout_ms(36000);
+        req.mutable_txn_info()->CopyFrom(txn_info_pb);
+        BeginTxnResponse res;
+        
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        txn_id = res.txn_id();
+    }
+    // case: begin 2 sub txn
+    int64_t sub_txn_id1 = -1;
+    int64_t sub_txn_id2 = -1;
+    for (int i = 0; i < 2; i++) {
+        {
+            brpc::Controller cntl;
+            BeginSubTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id);
+            req.set_sub_txn_num(i);
+            req.set_db_id(db_id);
+            req.set_label("test_label_" + std::to_string(i));
+            req.mutable_table_ids()->Add(1234);
+            req.mutable_table_ids()->Add(1235);
+            if (i == 1) {
+                req.mutable_table_ids()->Add(1235);
+            }
+            BeginSubTxnResponse res;
+            
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), i == 0 ? 1 : 2);
+            ASSERT_TRUE(res.has_sub_txn_id());
+            if (i == 0) {
+                sub_txn_id1 = res.sub_txn_id();
+                ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+            } else {
+                sub_txn_id2 = res.sub_txn_id();
+                ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+                ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]);
+            }
+        }
+        // get txn state
+        {
+            brpc::Controller cntl;
+            GetTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id);
+            GetTxnResponse res;
+            
meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                  &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3);
+            ASSERT_EQ(res.txn_info().table_ids()[0], 1234);
+            ASSERT_EQ(res.txn_info().table_ids()[1], 1235);
+            if (i == 1) {
+                ASSERT_EQ(res.txn_info().table_ids()[2], 1235);
+            }
+        }
+    }
+    // case: abort sub txn
+    {
+        {
+            brpc::Controller cntl;
+            AbortSubTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id);
+            req.set_sub_txn_id(sub_txn_id2);
+            req.set_sub_txn_num(2);
+            req.set_db_id(db_id);
+            req.mutable_table_ids()->Add(1234);
+            req.mutable_table_ids()->Add(1235);
+            AbortSubTxnResponse res;
+            
meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            // check txn state
+            ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+            ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+            ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]);
+        }
+        // get txn state
+        {
+            brpc::Controller cntl;
+            GetTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id);
+            GetTxnResponse res;
+            
meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                  &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+            ASSERT_EQ(res.txn_info().table_ids()[0], 1234);
+            ASSERT_EQ(res.txn_info().table_ids()[1], 1235);
+        }
+    }
+    // check label key does not exist
+    for (int i = 0; i < 2; i++) {
+        std::string key =
+                txn_label_key({"test_instance", db_id, "test_label_" + 
std::to_string(i)});
+        std::string val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
+    }
+    // check txn index key exist
+    for (auto i : {sub_txn_id1, sub_txn_id2}) {
+        std::string key = txn_index_key({"test_instance", i});
+        std::string val;
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+    }
+}
+
 TEST(MetaServiceTest, AbortTxnTest) {
     auto meta_service = get_meta_service();
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index f653f4e8e0b..07aa5b2d40b 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -1464,6 +1464,151 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
         ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, 
txn_info_pb), -2);
         ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, 
label), 0);
     }
+
+    label = "recycle_expired_txn_label_with_sub_txn";
+    int64_t table2_id = 12131278;
+    {
+        // 1. begin_txn
+        // 2. begin_sub_txn2
+        // 3. begin_sub_txn3
+        // 4. abort_sub_txn3
+        // 5. commit_txn
+        // 6. recycle_expired_txn_label
+        // 7. check
+        [[maybe_unused]] int64_t sub_txn_id1 = -1;
+        int64_t sub_txn_id2 = -1;
+        int64_t sub_txn_id3 = -1;
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+
+            req.set_cloud_unique_id(cloud_unique_id);
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label(label);
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(10000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            txn_id = res.txn_id();
+            sub_txn_id1 = txn_id;
+            ASSERT_GT(txn_id, -1);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+        InstanceInfoPB instance;
+        instance.set_instance_id(mock_instance);
+        InstanceRecycler recycler(txn_kv, instance);
+        ASSERT_EQ(recycler.init(), 0);
+        sleep(1);
+        recycler.abort_timeout_txn();
+        TxnInfoPB txn_info_pb;
+        get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb);
+        ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED);
+
+        // 2. begin sub_txn2
+        {
+            brpc::Controller cntl;
+            BeginSubTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_txn_id(txn_id);
+            req.set_sub_txn_num(0);
+            req.set_db_id(db_id);
+            req.set_label("test_sub_label1");
+            req.mutable_table_ids()->Add(table_id);
+            req.mutable_table_ids()->Add(table2_id);
+            BeginSubTxnResponse res;
+            
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+            ASSERT_TRUE(res.has_sub_txn_id());
+            sub_txn_id2 = res.sub_txn_id();
+            ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+        }
+
+        // 3. begin sub_txn3
+        {
+            brpc::Controller cntl;
+            BeginSubTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_txn_id(txn_id);
+            req.set_sub_txn_num(1);
+            req.set_db_id(db_id);
+            req.set_label("test_sub_label2");
+            req.mutable_table_ids()->Add(table_id);
+            req.mutable_table_ids()->Add(table2_id);
+            req.mutable_table_ids()->Add(table_id);
+            BeginSubTxnResponse res;
+            
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+            ASSERT_TRUE(res.has_sub_txn_id());
+            sub_txn_id3 = res.sub_txn_id();
+            ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+            ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+        }
+
+        // 4. abort sub_txn3
+        {
+            brpc::Controller cntl;
+            AbortSubTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_txn_id(txn_id);
+            req.set_sub_txn_num(2);
+            req.set_sub_txn_id(sub_txn_id3);
+            req.set_db_id(db_id);
+            req.mutable_table_ids()->Add(table_id);
+            req.mutable_table_ids()->Add(table2_id);
+            AbortSubTxnResponse res;
+            
meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            // check txn state
+            ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+            ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+            ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+            ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+        }
+
+        // 4. commit_txn
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_txn_load(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+        // check txn_index_key for sub_txn_id exist
+        for (auto i : {sub_txn_id2, sub_txn_id3}) {
+            std::string key = txn_index_key({mock_instance, i});
+            std::string val;
+            std::unique_ptr<Transaction> txn;
+            ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+            ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+        }
+        // 5. recycle
+        recycler.recycle_expired_txn_label();
+        ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, 
txn_info_pb), -2);
+        ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, 
label), 0);
+        // check txn_index_key for sub_txn_id are deleted
+        for (auto i : {sub_txn_id2, sub_txn_id3}) {
+            std::string key = txn_index_key({mock_instance, i});
+            std::string val;
+            std::unique_ptr<Transaction> txn;
+            ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+            ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
+        }
+    }
 }
 
 void create_object_file_pb(std::string prefix, std::vector<ObjectFilePB>* 
object_files,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 7463a684680..f7a178deb01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -200,6 +200,24 @@ public class MetaServiceClient {
         return blockingStub.getCurrentMaxTxnId(request);
     }
 
+    public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest 
request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud.BeginSubTxnRequest.Builder builder = 
Cloud.BeginSubTxnRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.beginSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return blockingStub.beginSubTxn(request);
+    }
+
+    public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest 
request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud.AbortSubTxnRequest.Builder builder = 
Cloud.AbortSubTxnRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.abortSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return blockingStub.abortSubTxn(request);
+    }
+
     public Cloud.CheckTxnConflictResponse 
checkTxnConflict(Cloud.CheckTxnConflictRequest request) {
         return blockingStub.checkTxnConflict(request);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 117cfd71bd0..a2dbdaac2c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -246,6 +246,26 @@ public class MetaServiceProxy {
         }
     }
 
+    public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest 
request)
+            throws RpcException {
+        try {
+            final MetaServiceClient client = getProxy();
+            return client.beginSubTxn(request);
+        } catch (Exception e) {
+            throw new RpcException("", e.getMessage(), e);
+        }
+    }
+
+    public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest 
request)
+            throws RpcException {
+        try {
+            final MetaServiceClient client = getProxy();
+            return client.abortSubTxn(request);
+        } catch (Exception e) {
+            throw new RpcException("", e.getMessage(), e);
+        }
+    }
+
     public Cloud.CheckTxnConflictResponse 
checkTxnConflict(Cloud.CheckTxnConflictRequest request)
             throws RpcException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 98c3c5dfc8d..3184d150df7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -28,8 +28,12 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
+import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest;
+import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.BeginTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictRequest;
@@ -50,6 +54,7 @@ import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
 import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.SubTxnInfo;
 import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
 import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
 import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
@@ -63,6 +68,7 @@ import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
@@ -124,10 +130,12 @@ import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -139,6 +147,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15;
 
     private TxnStateCallbackFactory callbackFactory;
+    private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>();
 
     public CloudGlobalTransactionMgr() {
         this.callbackFactory = new TxnStateCallbackFactory();
@@ -453,6 +462,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
+        commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
+    }
+
+    private void commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC, long dbId,
+            List<Table> tableList) throws UserException {
         CommitTxnResponse commitTxnResponse = null;
         int retryTime = 0;
 
@@ -783,7 +797,35 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public boolean commitAndPublishTransaction(DatabaseIf db, long 
transactionId,
             List<SubTransactionState> subTransactionStates, long 
timeoutMillis) throws UserException {
-        throw new UnsupportedOperationException("commitAndPublishTransaction 
is not supported in cloud");
+        if (Config.disable_load_job) {
+            throw new TransactionCommitFailedException(
+                    "disable_load_job is set to true, all load jobs are not 
allowed");
+        }
+        LOG.info("try to commit transaction, txnId: {}, subTxnStates: {}", 
transactionId, subTransactionStates);
+        cleanSubTransactions(transactionId);
+        CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
+        builder.setDbId(db.getId())
+                .setTxnId(transactionId)
+                .setIs2Pc(false)
+                .setCloudUniqueId(Config.cloud_unique_id)
+                .setIsTxnLoad(true);
+        // add sub txn infos
+        for (SubTransactionState subTransactionState : subTransactionStates) {
+            
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
+                    .setTableId(subTransactionState.getTable().getId())
+                    .addAllBaseTabletIds(
+                            
getBaseTabletsFromTables(Lists.newArrayList(subTransactionState.getTable()),
+                                    
subTransactionState.getTabletCommitInfos().stream()
+                                            .map(c -> new 
TabletCommitInfo(c.getTabletId(), c.getBackendId()))
+                                            .collect(Collectors.toList())))
+                    .build());
+        }
+
+        final CommitTxnRequest commitTxnRequest = builder.build();
+        commitTxn(commitTxnRequest, transactionId, false, db.getId(),
+                
subTransactionStates.stream().map(SubTransactionState::getTable)
+                        .collect(Collectors.toList()));
+        return true;
     }
 
     @Override
@@ -812,6 +854,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     @Override
     public void abortTransaction(Long dbId, Long transactionId, String reason) 
throws UserException {
+        cleanSubTransactions(transactionId);
         abortTransaction(dbId, transactionId, reason, null, null);
     }
 
@@ -1133,6 +1176,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     @Override
     public TransactionState getTransactionState(long dbId, long transactionId) 
{
+        if (subTxnIdToTxnId.containsKey(transactionId)) {
+            LOG.info("try to get transaction state, subTxnId:{}, 
transactionId:{}", transactionId,
+                    subTxnIdToTxnId.get(transactionId));
+            transactionId = subTxnIdToTxnId.get(transactionId);
+        }
         LOG.info("try to get transaction state, dbId:{}, transactionId:{}", 
dbId, transactionId);
         GetTxnRequest.Builder builder = GetTxnRequest.newBuilder();
         builder.setDbId(dbId);
@@ -1351,11 +1399,101 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     @Override
     public void addSubTransaction(long dbId, long transactionId, long 
subTransactionId) {
-        throw new UnsupportedOperationException("addSubTransaction is not 
supported in cloud");
+        subTxnIdToTxnId.put(subTransactionId, transactionId);
     }
 
     @Override
     public void removeSubTransaction(long dbId, long subTransactionId) {
-        throw new UnsupportedOperationException("removeSubTransaction is not 
supported in cloud");
+        subTxnIdToTxnId.remove(subTransactionId);
+    }
+
+    private void cleanSubTransactions(long transactionId) {
+        Iterator<Entry<Long, Long>> iterator = 
subTxnIdToTxnId.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Entry<Long, Long> entry = iterator.next();
+            if (entry.getValue() == transactionId) {
+                iterator.remove();
+            }
+        }
+    }
+
+    public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId, 
List<Long> tableIds, String label,
+            long subTxnNum) throws UserException {
+        LOG.info("try to begin sub transaction, txnId: {}, dbId: {}, tableIds: 
{}, label: {}, subTxnNum: {}", txnId,
+                dbId, tableIds, label, subTxnNum);
+        BeginSubTxnRequest request = 
BeginSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id)
+                
.setTxnId(txnId).setDbId(dbId).addAllTableIds(tableIds).setLabel(label).setSubTxnNum(subTxnNum).build();
+        BeginSubTxnResponse response = null;
+        int retryTime = 0;
+        try {
+            while (retryTime < Config.metaServiceRpcRetryTimes()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("retryTime:{}, beginSubTxnRequest:{}", 
retryTime, request);
+                }
+                response = MetaServiceProxy.getInstance().beginSubTxn(request);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("retryTime:{}, beginSubTxnResponse:{}", 
retryTime, response);
+                }
+
+                if (response.getStatus().getCode() != 
MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+                LOG.info("beginSubTxn KV_TXN_CONFLICT, retryTime:{}", 
retryTime);
+                backoff();
+                retryTime++;
+            }
+
+            Preconditions.checkNotNull(response);
+            Preconditions.checkNotNull(response.getStatus());
+        } catch (Exception e) {
+            LOG.warn("beginSubTxn failed, exception:", e);
+            throw new UserException("beginSubTxn failed, errMsg:" + 
e.getMessage());
+        }
+
+        if (response.getStatus().getCode() != MetaServiceCode.OK) {
+            throw new UserException(response.getStatus().getMsg());
+        }
+        return Pair.of(response.hasSubTxnId() ? response.getSubTxnId() : 0,
+                TxnUtil.transactionStateFromPb(response.getTxnInfo()));
+    }
+
+    public TransactionState abortSubTxn(long txnId, long subTxnId, long dbId, 
List<Long> tableIds, long subTxnNum)
+            throws UserException {
+        LOG.info("try to abort sub transaction, txnId: {}, subTxnId: {}, dbId: 
{}, tableIds: {}, subTxnNum: {}", txnId,
+                subTxnId, dbId, tableIds, subTxnNum);
+        AbortSubTxnRequest request = 
AbortSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id)
+                
.setTxnId(txnId).setSubTxnId(subTxnId).setDbId(dbId).addAllTableIds(tableIds).setSubTxnNum(subTxnId)
+                .build();
+        AbortSubTxnResponse response = null;
+        int retryTime = 0;
+        try {
+            while (retryTime < Config.metaServiceRpcRetryTimes()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("retryTime:{}, abortSubTxnRequest:{}", 
retryTime, request);
+                }
+                response = MetaServiceProxy.getInstance().abortSubTxn(request);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("retryTime:{}, abortSubTxnResponse:{}", 
retryTime, response);
+                }
+
+                if (response.getStatus().getCode() != 
MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+                LOG.info("abortSubTxn KV_TXN_CONFLICT, retryTime:{}", 
retryTime);
+                backoff();
+                retryTime++;
+            }
+
+            Preconditions.checkNotNull(response);
+            Preconditions.checkNotNull(response.getStatus());
+        } catch (Exception e) {
+            LOG.warn("abortSubTxn failed, exception:", e);
+            throw new UserException("abortSubTxn failed, errMsg:" + 
e.getMessage());
+        }
+
+        if (response.getStatus().getCode() != MetaServiceCode.OK) {
+            throw new UserException(response.getStatus().getMsg());
+        }
+        return TxnUtil.transactionStateFromPb(response.getTxnInfo());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 00a95a42c22..f80782bbf5d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -21,10 +21,15 @@ import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
@@ -40,6 +45,7 @@ import org.apache.doris.thrift.TLoadTxnBeginResult;
 import org.apache.doris.thrift.TTabletCommitInfo;
 import org.apache.doris.thrift.TTxnLoadInfo;
 import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.SubTransactionState.SubTransactionType;
@@ -77,6 +83,12 @@ public class TransactionEntry {
     private long transactionId = -1;
     private TransactionState transactionState;
     private long timeoutTimestamp = -1;
+    // 1. For cloud mode, we keep subTransactionStates in TransactionEntry;
+    // 2. For doris, we keep subTransactionStates in TransactionState, because 
if executed in observer,
+    // the dml statements will be forwarded to master, so keep the 
subTransactionStates is in master.
+    private List<SubTransactionState> subTransactionStates = new ArrayList<>();
+    // Used for cloud mode, including all successful or failed sub 
transactions except the first one
+    private long allSubTxnNum = 0;
 
     public TransactionEntry() {
     }
@@ -176,11 +188,18 @@ public class TransactionEntry {
             throw new AnalysisException(
                     "Transaction insert can not insert into values and insert 
into select at the same time");
         }
+        if (Config.isCloudMode()) {
+            OlapTable olapTable = (OlapTable) table;
+            if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && 
olapTable.getEnableUniqueKeyMergeOnWrite()) {
+                throw new UserException(
+                        "Transaction load is not supported for merge on write 
unique keys table in cloud mode");
+            }
+        }
         DatabaseIf database = table.getDatabase();
         if (!isTransactionBegan) {
             long timeoutSecond = ConnectContext.get().getExecTimeout();
             this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond 
* 1000;
-            if (Env.getCurrentEnv().isMaster()) {
+            if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                 this.transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(
                         database.getId(), Lists.newArrayList(table.getId()), 
label,
                         new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
@@ -206,9 +225,23 @@ public class TransactionEntry {
                 throw new AnalysisException(
                         "Transaction insert must be in the same database, 
expect db_id=" + this.database.getId());
             }
-            this.transactionState.addTableId(table.getId());
-            long subTxnId = 
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
+            long subTxnId;
+            if (Config.isCloudMode()) {
+                TUniqueId queryId = ConnectContext.get().queryId();
+                String label = String.format("tl_%x_%x", queryId.hi, 
queryId.lo);
+                List<Long> tableIds = getTableIds();
+                tableIds.add(table.getId());
+                Pair<Long, TransactionState> pair
+                        = ((CloudGlobalTransactionMgr) 
Env.getCurrentGlobalTransactionMgr()).beginSubTxn(
+                        transactionId, table.getDatabase().getId(), tableIds, 
label, allSubTxnNum);
+                this.transactionState = pair.second;
+                subTxnId = pair.first;
+            } else {
+                subTxnId = 
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
+                this.transactionState.addTableId(table.getId());
+            }
             
Env.getCurrentGlobalTransactionMgr().addSubTransaction(database.getId(), 
transactionId, subTxnId);
+            allSubTxnNum++;
             return subTxnId;
         }
     }
@@ -216,7 +249,7 @@ public class TransactionEntry {
     public TransactionStatus commitTransaction() throws Exception {
         if (isTransactionBegan) {
             try {
-                if (Env.getCurrentEnv().isMaster()) {
+                if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                     beforeFinishTransaction();
                     long commitTimeout = Math.min(60000L, 
Math.max(timeoutTimestamp - System.currentTimeMillis(), 0));
                     if 
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, 
transactionId,
@@ -275,7 +308,7 @@ public class TransactionEntry {
 
     private long abortTransaction(String reason) throws Exception {
         if (isTransactionBegan) {
-            if (Env.getCurrentEnv().isMaster()) {
+            if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                 beforeFinishTransaction();
                 
Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), 
transactionId, reason);
                 return transactionId;
@@ -301,7 +334,9 @@ public class TransactionEntry {
         if (isTransactionBegan) {
             List<Long> tableIds = 
transactionState.getTableIdList().stream().distinct().collect(Collectors.toList());
             transactionState.setTableIdList(tableIds);
-            transactionState.getSubTransactionStates().sort((s1, s2) -> {
+            List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
+                    : transactionState.getSubTransactionStates();
+            subTransactionStatesPtr.sort((s1, s2) -> {
                 if (s1.getSubTransactionType() == SubTransactionType.INSERT
                         && s2.getSubTransactionType() == 
SubTransactionType.DELETE) {
                     return 1;
@@ -312,6 +347,9 @@ public class TransactionEntry {
                     return Long.compare(s1.getSubTransactionId(), 
s2.getSubTransactionId());
                 }
             });
+            if (Config.isCloudMode()) {
+                
transactionState.setSubTransactionStates(subTransactionStatesPtr);
+            }
             LOG.info("subTransactionStates={}", 
transactionState.getSubTransactionStates());
             transactionState.resetSubTxnIds();
         }
@@ -329,7 +367,18 @@ public class TransactionEntry {
 
     public void abortSubTransaction(long subTransactionId, Table table) {
         if (isTransactionBegan) {
-            this.transactionState.removeTableId(table.getId());
+            if (Config.isCloudMode()) {
+                try {
+                    List<Long> tableIds = getTableIds();
+                    this.transactionState
+                            = ((CloudGlobalTransactionMgr) 
Env.getCurrentGlobalTransactionMgr()).abortSubTxn(
+                            transactionId, subTransactionId, 
table.getDatabase().getId(), tableIds, allSubTxnNum);
+                } catch (UserException e) {
+                    LOG.error("Failed to remove table_id={} from txn_id={}", 
table.getId(), transactionId, e);
+                }
+            } else {
+                this.transactionState.removeTableId(table.getId());
+            }
             
Env.getCurrentGlobalTransactionMgr().removeSubTransaction(table.getDatabase().getId(),
 subTransactionId);
         }
     }
@@ -340,13 +389,15 @@ public class TransactionEntry {
             LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={}, 
commit_infos={}",
                     label, transactionId, subTxnId, table, commitInfos);
         }
-        transactionState.getSubTransactionStates()
+        List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
+                : transactionState.getSubTransactionStates();
+        subTransactionStatesPtr
                 .add(new SubTransactionState(subTxnId, table, commitInfos, 
subTransactionType));
-        Preconditions.checkState(
-                transactionState.getTableIdList().size() == 
transactionState.getSubTransactionStates().size(),
-                "txn_id=" + transactionId + ", expect table_list=" + 
transactionState.getSubTransactionStates().stream()
-                        .map(s -> 
s.getTable().getId()).collect(Collectors.toList()) + ", real table_list="
-                        + transactionState.getTableIdList());
+        Preconditions.checkState(transactionState.getTableIdList().size() == 
subTransactionStatesPtr.size(),
+                "txn_id=" + transactionId
+                        + ", expect table_list="
+                        + subTransactionStatesPtr.stream().map(s -> 
s.getTable().getId()).collect(Collectors.toList())
+                        + ", real table_list=" + 
transactionState.getTableIdList());
     }
 
     public boolean isTransactionBegan() {
@@ -442,4 +493,10 @@ public class TransactionEntry {
         LOG.info("set txn load info in observer, label={}, txnId={}, dbId={}, 
timeoutTimestamp={}",
                 label, transactionId, dbId, timeoutTimestamp);
     }
+
+    private List<Long> getTableIds() {
+        List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
+                : transactionState.getSubTransactionStates();
+        return subTransactionStatesPtr.stream().map(s -> 
s.getTable().getId()).collect(Collectors.toList());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 3d1c2d54faa..f628c945b6c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -689,7 +689,7 @@ public class TransactionState implements Writable {
         sb.append(", db id: ").append(dbId);
         sb.append(", table id list: ").append(StringUtils.join(tableIdList, 
","));
         sb.append(", callback id: ").append(callbackId);
-        sb.append(", coordinator: ").append(txnCoordinator.toString());
+        sb.append(", coordinator: ").append(txnCoordinator);
         sb.append(", transaction status: ").append(transactionStatus);
         sb.append(", error replicas num: ").append(errorReplicas.size());
         sb.append(", replica ids: 
").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray()));
@@ -859,6 +859,10 @@ public class TransactionState implements Writable {
         this.subTransactionStates = new ArrayList<>();
     }
 
+    public void setSubTransactionStates(List<SubTransactionState> 
subTransactionStates) {
+        this.subTransactionStates = subTransactionStates;
+    }
+
     public void resetSubTxnIds() {
         this.subTxnIds = 
subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                 .collect(Collectors.toList());
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index bc57925e572..76ac8a3d306 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -378,6 +378,8 @@ message TxnInfoPB {
     optional TxnStatusPB status = 15;
     optional TxnCommitAttachmentPB commit_attachment = 16;
     optional int64 listener_id = 17; //callback id
+    // for transaction load, used for recycler
+    repeated int64 sub_txn_ids = 18;
     // TODO: There are more fields TBD
 }
 
@@ -646,6 +648,15 @@ message CommitTxnRequest {
     // merge-on-write table ids
     repeated int64 mow_table_ids = 6;
     repeated int64 base_tablet_ids= 7; // all tablet from base tables 
(excluding mv)
+    // for transaction load
+    optional bool is_txn_load = 9;
+    repeated SubTxnInfo sub_txn_infos = 10;
+}
+
+message SubTxnInfo {
+    optional int64 sub_txn_id = 1;
+    optional int64 table_id = 2;
+    repeated int64 base_tablet_ids= 3;
 }
 
 // corresponding to TabletStats in meta_service.h and FrontendServiceImpl.java
@@ -702,6 +713,41 @@ message GetTxnIdResponse {
     optional int64 txn_id = 2;
 }
 
+message BeginSubTxnRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 txn_id = 2;
+    // all successful or failed sub txn except the first one
+    optional int64 sub_txn_num = 3;
+    optional int64 db_id = 4;
+    // set table_ids in txn_info
+    repeated int64 table_ids = 5;
+    // a random label used to generate a sub_txn_id
+    optional string label = 6;
+}
+
+message BeginSubTxnResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional int64 sub_txn_id = 2;
+    optional TxnInfoPB txn_info = 3;
+}
+
+message AbortSubTxnRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 txn_id = 2;
+    // used for log
+    optional int64 sub_txn_id = 3;
+    // all successful or failed sub txn except the first one
+    optional int64 sub_txn_num = 4;
+    optional int64 db_id = 5;
+    // set table_ids in txn_info
+    repeated int64 table_ids = 6;
+}
+
+message AbortSubTxnResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional TxnInfoPB txn_info = 2;
+}
+
 message GetCurrentMaxTxnRequest {
     optional string cloud_unique_id = 1; // For auth
 }
@@ -1365,6 +1411,8 @@ service MetaService {
     rpc check_txn_conflict(CheckTxnConflictRequest) returns 
(CheckTxnConflictResponse);
     rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
     rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);
+    rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse);
+    rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse);
 
     rpc get_version(GetVersionRequest) returns (GetVersionResponse);
     rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git a/regression-test/data/insert_p0/txn_insert.out 
b/regression-test/data/insert_p0/txn_insert.out
index 4cd97b8cb0c..257bdcc0311 100644
--- a/regression-test/data/insert_p0/txn_insert.out
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -39,52 +39,6 @@
 6
 8
 
--- !select7 --
-
--- !select8 --
-
--- !select9 --
-
--- !select1 --
-\N     \N      \N      [null]  [null, 0]
-1      2.2     abc     []      []
-2      3.3     xyz     [1]     [1, 0]
-
--- !select2 --
-\N     \N      \N      [null]  [null, 0]
-1      2.2     abc     []      []
-2      3.3     xyz     [1]     [1, 0]
-
--- !select3 --
-\N     \N      \N      [null]  [null, 0]
-1      2.2     abc     []      []
-1      2.2     abc     []      []
-1      2.2     abc     []      []
-2      3.3     xyz     [1]     [1, 0]
-2      3.3     xyz     [1]     [1, 0]
-2      3.3     xyz     [1]     [1, 0]
-
--- !select4 --
-\N     \N      \N      [null]  [null, 0]
-1      2.2     abc     []      []
-1      2.2     abc     []      []
-1      2.2     abc     []      []
-2      3.3     xyz     [1]     [1, 0]
-2      3.3     xyz     [1]     [1, 0]
-2      3.3     xyz     [1]     [1, 0]
-
--- !select5 --
-1      2
-3      4
-5      6
-7      8
-
--- !select6 --
-2
-4
-6
-8
-
 -- !select7 --
 \N     \N      \N      [null]  [null, 0]
 1      2.2     abc     []      []
@@ -521,23 +475,6 @@
 2      3.3     xyz     [1]     [1, 0]
 2      3.3     xyz     [1]     [1, 0]
 
--- !select28 --
-1      a       10
-2      b       20
-3      c       30
-
--- !select29 --
-1      a       10
-2      b       20
-3      c       30
-4      d       40
-
--- !select30 --
-1      a       11
-2      b       20
-3      c       30
-4      d       40
-
 -- !select31 --
 1      a       10
 10     a       10
@@ -958,31 +895,6 @@
 9      a       10
 9      a       10
 
--- !select38 --
-1      a       101
-
--- !select39 --
-1      a       100
-
--- !select40 --
-1      2000-01-01      1       1       1.0
-3      2000-01-03      3       3       3.0
-
--- !select41 --
-2      2000-01-20      20      20      20.0
-3      2000-01-30      30      30      30.0
-4      2000-01-04      4       4       4.0
-6      2000-01-10      10      10      10.0
-
--- !select42 --
-3      2000-01-03      3       3       3.0
-
--- !select43 --
-1      2000-01-01      1       1       1.0
-2      2000-01-02      2       2       2.0
-3      2000-01-03      3       3       3.0
-6      2000-01-10      10      10      10.0
-
 -- !select44 --
 \N     \N      \N      [null]  [null, 0]
 \N     \N      \N      [null]  [null, 0]
@@ -1244,6 +1156,49 @@
 2      3.3     xyz     [1]     [1, 0]
 2      3.3     xyz     [1]     [1, 0]
 
+-- !selectmowi0 --
+1      a       10
+2      b       20
+3      c       30
+
+-- !selectmowi1 --
+1      a       10
+2      b       20
+3      c       30
+4      d       40
+
+-- !selectmowi2 --
+1      a       11
+2      b       21
+3      c       30
+4      d       40
+5      e       50
+
+-- !selectmowu1 --
+1      a       101
+
+-- !selectmowu2 --
+1      a       100
+
+-- !selectmowd1 --
+1      2000-01-01      1       1       1.0
+3      2000-01-03      3       3       3.0
+
+-- !selectmowd2 --
+2      2000-01-20      20      20      20.0
+3      2000-01-30      30      30      30.0
+4      2000-01-04      4       4       4.0
+6      2000-01-10      10      10      10.0
+
+-- !selectmowd3 --
+3      2000-01-03      3       3       3.0
+
+-- !selectmowd4 --
+1      2000-01-01      1       1       1.0
+2      2000-01-02      2       2       2.0
+3      2000-01-03      3       3       3.0
+6      2000-01-10      10      10      10.0
+
 -- !select_cu0 --
 1      0       10
 2      0       20
diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out 
b/regression-test/data/insert_p0/txn_insert_inject_case.out
new file mode 100644
index 00000000000..799229be54a
--- /dev/null
+++ b/regression-test/data/insert_p0/txn_insert_inject_case.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+\N     \N      \N      [null]  [null, 0]
+\N     \N      \N      [null]  [null, 0]
+1      2.2     abc     []      []
+1      2.2     abc     []      []
+2      3.3     xyz     [1]     [1, 0]
+2      3.3     xyz     [1]     [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert.groovy
index d9ccd6831cf..1d4183c8662 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -42,7 +42,7 @@ suite("txn_insert") {
         return null
     }
 
-    for (def use_nereids_planner : [false, true]) {
+    for (def use_nereids_planner : [/*false,*/ true]) {
         sql " SET enable_nereids_planner = $use_nereids_planner; "
 
         sql """ DROP TABLE IF EXISTS $table """
@@ -242,7 +242,26 @@ suite("txn_insert") {
             order_qt_select24 """select * from ${table}_2"""
         }
 
-        // 7. insert into select to same table
+        // 7. insert into tables in different database
+        if (use_nereids_planner) {
+            def db2 = "regression_test_insert_p0_1"
+            sql """ create database if not exists $db2 """
+
+            try {
+                sql """ create table ${db2}.${table} like ${table} """
+                sql """ begin; """
+                sql """ insert into ${table} select * from ${table}_0; """
+                test {
+                    sql """ insert into $db2.${table} select * from 
${table}_0; """
+                    exception """Transaction insert must be in the same 
database, expect db_id"""
+                }
+            } finally {
+                sql """rollback"""
+                sql """ drop database if exists $db2 """
+            }
+        }
+
+        // 8. insert into select to same table
         if (use_nereids_planner) {
             sql """ begin; """
             sql """ insert into ${table}_0 select * from ${table}_1; """
@@ -273,61 +292,9 @@ suite("txn_insert") {
             }
         }
 
-        // 8. insert into tables in different database
-        if (use_nereids_planner) {
-            def db2 = "regression_test_insert_p0_1"
-            sql """ create database if not exists $db2 """
-
-            try {
-                sql """ create table ${db2}.${table} like ${table} """
-                sql """ begin; """
-                sql """ insert into ${table} select * from ${table}_0; """
-                test {
-                    sql """ insert into $db2.${table} select * from 
${table}_0; """
-                    exception """Transaction insert must be in the same 
database, expect db_id"""
-                }
-            } finally {
-                sql """rollback"""
-                sql """ drop database if exists $db2 """
-            }
-        }
-
-        // 9. insert into mow tables
-        if (use_nereids_planner) {
-            def unique_table = "ut"
-            for (def i in 0..2) {
-                sql """ drop table if exists ${unique_table}_${i} """
-                sql """
-                    CREATE TABLE ${unique_table}_${i} (
-                        `id` int(11) NOT NULL,
-                        `name` varchar(50) NULL,
-                        `score` int(11) NULL default "-1"
-                    ) ENGINE=OLAP
-                    UNIQUE KEY(`id`, `name`)
-                    DISTRIBUTED BY HASH(`id`) BUCKETS 1
-                    PROPERTIES (
-                """ + (i == 2 ? "\"function_column.sequence_col\"='score', " : 
"") +
-                """
-                        "replication_num" = "1"
-                    );
-                """
-            }
-            sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b", 
20), (3, "c", 30); """
-            sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b", 
19), (4, "d", 40); """
-            sql """ begin """
-            sql """ insert into ${unique_table}_2 select * from 
${unique_table}_0; """
-            sql """ insert into ${unique_table}_1 select * from 
${unique_table}_0; """
-            sql """ insert into ${unique_table}_2 select * from 
${unique_table}_1; """
-            sql """ commit; """
-            sql "sync"
-            order_qt_select28 """select * from ${unique_table}_0"""
-            order_qt_select29 """select * from ${unique_table}_1"""
-            order_qt_select30 """select * from ${unique_table}_2"""
-        }
-
-        // 10. insert into table with multi partitions and tablets
+        // 9. insert into table with multi partitions and tablets
         if (use_nereids_planner) {
-            def pt = "multi_partition_t"
+            def pt = "txn_insert_multi_partition_t"
             for (def i in 0..3) {
                 sql """ drop table if exists ${pt}_${i} """
                 sql """
@@ -380,142 +347,7 @@ suite("txn_insert") {
             sql """ set enable_insert_strict = true """
         }
 
-        // 11. update stmt
-        if (use_nereids_planner) {
-            def ut_table = "txn_insert_ut"
-            for (def i in 1..2) {
-                def tableName = ut_table + "_" + i
-                sql """ DROP TABLE IF EXISTS ${tableName} """
-                sql """
-                    CREATE TABLE ${tableName} (
-                        `ID` int(11) NOT NULL,
-                        `NAME` varchar(100) NULL,
-                        `score` int(11) NULL
-                    ) ENGINE=OLAP
-                    unique KEY(`id`)
-                    COMMENT 'OLAP'
-                    DISTRIBUTED BY HASH(`id`) BUCKETS 1
-                    PROPERTIES (
-                        "replication_num" = "1"
-                    );
-                """
-            }
-            sql """ insert into ${ut_table}_1 values(1, "a", 100); """
-            sql """ begin; """
-            sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
-            sql """ update ${ut_table}_1 set score = 101 where id = 1; """
-            sql """ commit; """
-            sql "sync"
-            order_qt_select38 """select * from ${ut_table}_1 """
-            order_qt_select39 """select * from ${ut_table}_2 """
-        }
-
-        // 12. delete from using and delete from stmt
-        if (use_nereids_planner) {
-            for (def ta in ["txn_insert_dt1", "txn_insert_dt2", 
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
-                sql """ drop table if exists ${ta} """
-            }
-
-            for (def ta in ["txn_insert_dt1", "txn_insert_dt4", 
"txn_insert_dt5"]) {
-                sql """
-                    create table ${ta} (
-                        id int,
-                        dt date,
-                        c1 bigint,
-                        c2 string,
-                        c3 double
-                    ) unique key (id, dt)
-                    partition by range(dt) (
-                        from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
-                    )
-                    distributed by hash(id)
-                    properties(
-                        'replication_num'='1',
-                        "enable_unique_key_merge_on_write" = "true"
-                    );
-                """
-                sql """
-                    INSERT INTO ${ta} VALUES
-                        (1, '2000-01-01', 1, '1', 1.0),
-                        (2, '2000-01-02', 2, '2', 2.0),
-                        (3, '2000-01-03', 3, '3', 3.0);
-                """
-            }
-
-            sql """
-                create table txn_insert_dt2 (
-                    id int,
-                    dt date,
-                    c1 bigint,
-                    c2 string,
-                    c3 double
-                ) unique key (id)
-                distributed by hash(id)
-                properties(
-                    'replication_num'='1'
-                );
-            """
-            sql """
-                create table txn_insert_dt3 (
-                    id int
-                ) distributed by hash(id)
-                properties(
-                    'replication_num'='1'
-                );
-            """
-            sql """
-                INSERT INTO txn_insert_dt2 VALUES
-                    (1, '2000-01-10', 10, '10', 10.0),
-                    (2, '2000-01-20', 20, '20', 20.0),
-                    (3, '2000-01-30', 30, '30', 30.0),
-                    (4, '2000-01-04', 4, '4', 4.0),
-                    (5, '2000-01-05', 5, '5', 5.0);
-            """
-            sql """
-                INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
-            """
-            sql """ begin """
-            test {
-                sql '''
-                    delete from txn_insert_dt1 temporary partition (p_20000102)
-                    using txn_insert_dt2 join txn_insert_dt3 on 
txn_insert_dt2.id = txn_insert_dt3.id
-                    where txn_insert_dt1.id = txn_insert_dt2.id;
-                '''
-                exception 'Partition: p_20000102 is not exists'
-            }
-            sql """
-                delete from txn_insert_dt1 partition (p_20000102)
-                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
-                where txn_insert_dt1.id = txn_insert_dt2.id;
-            """
-            sql """
-                delete from txn_insert_dt4
-                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
-                where txn_insert_dt4.id = txn_insert_dt2.id;
-            """
-            sql """
-                delete from txn_insert_dt2 where id = 1;
-            """
-            sql """
-                delete from txn_insert_dt2 where id = 5;
-            """
-            sql """
-                delete from txn_insert_dt5 partition(p_20000102) where id = 1;
-            """
-            sql """
-                delete from txn_insert_dt5 partition(p_20000102) where id = 5;
-            """
-            sql """ commit """
-            sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
-            sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
-            sql "sync"
-            order_qt_select40 """select * from txn_insert_dt1 """
-            order_qt_select41 """select * from txn_insert_dt2 """
-            order_qt_select42 """select * from txn_insert_dt4 """
-            order_qt_select43 """select * from txn_insert_dt5 """
-        }
-
-        // 13. decrease be 'pending_data_expire_time_sec' config
+        // 10. decrease be 'pending_data_expire_time_sec' config
         if (use_nereids_planner) {
             def backendId_to_params = 
get_be_param("pending_data_expire_time_sec")
             try {
@@ -534,7 +366,7 @@ suite("txn_insert") {
             }
         }
 
-        // 14. delete and insert
+        // 11. delete and insert
         if (use_nereids_planner) {
             sql """ begin; """
             sql """ delete from ${table}_0 where k1 = 1 or k1 = 2; """
@@ -544,7 +376,7 @@ suite("txn_insert") {
             order_qt_select45 """select * from ${table}_0"""
         }
 
-        // 15. insert and delete
+        // 12. insert and delete
         if (use_nereids_planner) {
             order_qt_select46 """select * from ${table}_1"""
             sql """ begin; """
@@ -559,7 +391,7 @@ suite("txn_insert") {
             order_qt_select48 """select * from ${table}_1"""
         }
 
-        // 16. txn insert does not commit or rollback by user, and txn is 
aborted because connection is closed
+        // 13. txn insert does not commit or rollback by user, and txn is 
aborted because connection is closed
         def dbName = "regression_test_insert_p0"
         def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
         logger.info("url: ${url}")
@@ -584,22 +416,24 @@ suite("txn_insert") {
             thread.start()
             thread.join()
             assertNotEquals(txn_id, 0)
-            def txn_state = ""
-            for (int i = 0; i < 20; i++) {
-                def txn_info = sql_return_maparray """ show transaction where 
id = ${txn_id} """
-                logger.info("txn_info: ${txn_info}")
-                assertEquals(1, txn_info.size())
-                txn_state = txn_info[0].get("TransactionStatus")
-                if ("ABORTED" == txn_state) {
-                    break
-                } else {
-                    sleep(2000)
+            if (!isCloudMode()) {
+                def txn_state = ""
+                for (int i = 0; i < 20; i++) {
+                    def txn_info = sql_return_maparray """ show transaction 
where id = ${txn_id} """
+                    logger.info("txn_info: ${txn_info}")
+                    assertEquals(1, txn_info.size())
+                    txn_state = txn_info[0].get("TransactionStatus")
+                    if ("ABORTED" == txn_state) {
+                        break
+                    } else {
+                        sleep(2000)
+                    }
                 }
+                assertEquals("ABORTED", txn_state)
             }
-            assertEquals("ABORTED", txn_state)
         }
 
-        // 17. txn insert does not commit or rollback by user, and txn is 
aborted because timeout
+        // 14. txn insert does not commit or rollback by user, and txn is 
aborted because timeout
         // TODO find a way to check be txn_manager is also cleaned
         if (use_nereids_planner) {
             // 1. use show transaction command to check
@@ -620,19 +454,21 @@ suite("txn_insert") {
             thread.start()
             insertLatch.await(1, TimeUnit.MINUTES)
             assertNotEquals(txn_id, 0)
-            def txn_state = ""
-            for (int i = 0; i < 20; i++) {
-                def txn_info = sql_return_maparray """ show transaction where 
id = ${txn_id} """
-                logger.info("txn_info: ${txn_info}")
-                assertEquals(1, txn_info.size())
-                txn_state = txn_info[0].get("TransactionStatus")
-                if ("ABORTED" == txn_state) {
-                    break
-                } else {
-                    sleep(2000)
+            if (!isCloudMode()) {
+                def txn_state = ""
+                for (int i = 0; i < 20; i++) {
+                    def txn_info = sql_return_maparray """ show transaction 
where id = ${txn_id} """
+                    logger.info("txn_info: ${txn_info}")
+                    assertEquals(1, txn_info.size())
+                    txn_state = txn_info[0].get("TransactionStatus")
+                    if ("ABORTED" == txn_state) {
+                        break
+                    } else {
+                        sleep(2000)
+                    }
                 }
+                assertEquals("ABORTED", txn_state)
             }
-            assertEquals("ABORTED", txn_state)
 
             // after the txn is timeout: do insert/ commit/ rollback
             def insert_timeout = sql """show variables where variable_name = 
'insert_timeout';"""
@@ -686,7 +522,191 @@ suite("txn_insert") {
             }
         }
 
-        // 18. column update
+        // 15. insert into mow tables
+        if (use_nereids_planner) {
+            def unique_table = "txn_insert_ut"
+            for (def i in 0..2) {
+                sql """ drop table if exists ${unique_table}_${i} """
+                sql """
+                    CREATE TABLE ${unique_table}_${i} (
+                        `id` int(11) NOT NULL,
+                        `name` varchar(50) NULL,
+                        `score` int(11) NULL default "-1"
+                    ) ENGINE=OLAP
+                    UNIQUE KEY(`id`, `name`)
+                    DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                    PROPERTIES (
+                """ + (i == 2 ? "\"function_column.sequence_col\"='score', " : 
"") +
+                        """
+                        "replication_num" = "1"
+                    );
+                """
+            }
+            sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b", 
20), (3, "c", 30); """
+            sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b", 
19), (4, "d", 40); """
+            sql """ insert into ${unique_table}_2 values(1, "a", 9), (2, "b", 
21), (4, "d", 39), (5, "e", 50); """
+            sql """ begin """
+            try {
+                sql """ insert into ${unique_table}_2 select * from 
${unique_table}_0; """
+                sql """ insert into ${unique_table}_1 select * from 
${unique_table}_0; """
+                sql """ insert into ${unique_table}_2 select * from 
${unique_table}_1; """
+                sql """ commit; """
+                sql "sync"
+                order_qt_selectmowi0 """select * from ${unique_table}_0"""
+                order_qt_selectmowi1 """select * from ${unique_table}_1"""
+                order_qt_selectmowi2 """select * from ${unique_table}_2"""
+            } catch (Exception e) {
+                logger.info("exception: " + e)
+                if (isCloudMode()) {
+                    assertTrue(e.getMessage().contains("Transaction load is 
not supported for merge on write unique keys table in cloud mode"))
+                    sql """ rollback """
+                } else {
+                    assertTrue(false, "should not reach here")
+                }
+            }
+        }
+
+        // the following cases are not supported in cloud mode
+        if (isCloudMode()) {
+            break
+        }
+
+        // 16. update stmt(mow table)
+        if (use_nereids_planner) {
+            def ut_table = "txn_insert_ut"
+            for (def i in 1..2) {
+                def tableName = ut_table + "_" + i
+                sql """ DROP TABLE IF EXISTS ${tableName} """
+                sql """
+                    CREATE TABLE ${tableName} (
+                        `ID` int(11) NOT NULL,
+                        `NAME` varchar(100) NULL,
+                        `score` int(11) NULL
+                    ) ENGINE=OLAP
+                    unique KEY(`id`)
+                    COMMENT 'OLAP'
+                    DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                    PROPERTIES (
+                        "replication_num" = "1"
+                    );
+                """
+            }
+            sql """ insert into ${ut_table}_1 values(1, "a", 100); """
+            sql """ begin; """
+            sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
+            sql """ update ${ut_table}_1 set score = 101 where id = 1; """
+            sql """ commit; """
+            sql "sync"
+            order_qt_selectmowu1 """select * from ${ut_table}_1 """
+            order_qt_selectmowu2 """select * from ${ut_table}_2 """
+        }
+
+        // 17. delete from using and delete from stmt(mow table)
+        if (use_nereids_planner) {
+            for (def ta in ["txn_insert_dt1", "txn_insert_dt2", 
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
+                sql """ drop table if exists ${ta} """
+            }
+
+            for (def ta in ["txn_insert_dt1", "txn_insert_dt4", 
"txn_insert_dt5"]) {
+                sql """
+                    create table ${ta} (
+                        id int,
+                        dt date,
+                        c1 bigint,
+                        c2 string,
+                        c3 double
+                    ) unique key (id, dt)
+                    partition by range(dt) (
+                        from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
+                    )
+                    distributed by hash(id)
+                    properties(
+                        'replication_num'='1',
+                        "enable_unique_key_merge_on_write" = "true"
+                    );
+                """
+                sql """
+                    INSERT INTO ${ta} VALUES
+                        (1, '2000-01-01', 1, '1', 1.0),
+                        (2, '2000-01-02', 2, '2', 2.0),
+                        (3, '2000-01-03', 3, '3', 3.0);
+                """
+            }
+
+            sql """
+                create table txn_insert_dt2 (
+                    id int,
+                    dt date,
+                    c1 bigint,
+                    c2 string,
+                    c3 double
+                ) unique key (id)
+                distributed by hash(id)
+                properties(
+                    'replication_num'='1'
+                );
+            """
+            sql """
+                create table txn_insert_dt3 (
+                    id int
+                ) distributed by hash(id)
+                properties(
+                    'replication_num'='1'
+                );
+            """
+            sql """
+                INSERT INTO txn_insert_dt2 VALUES
+                    (1, '2000-01-10', 10, '10', 10.0),
+                    (2, '2000-01-20', 20, '20', 20.0),
+                    (3, '2000-01-30', 30, '30', 30.0),
+                    (4, '2000-01-04', 4, '4', 4.0),
+                    (5, '2000-01-05', 5, '5', 5.0);
+            """
+            sql """
+                INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
+            """
+            sql """ begin """
+            test {
+                sql '''
+                    delete from txn_insert_dt1 temporary partition (p_20000102)
+                    using txn_insert_dt2 join txn_insert_dt3 on 
txn_insert_dt2.id = txn_insert_dt3.id
+                    where txn_insert_dt1.id = txn_insert_dt2.id;
+                '''
+                exception 'Partition: p_20000102 is not exists'
+            }
+            sql """
+                delete from txn_insert_dt1 partition (p_20000102)
+                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
+                where txn_insert_dt1.id = txn_insert_dt2.id;
+            """
+            sql """
+                delete from txn_insert_dt4
+                using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id 
= txn_insert_dt3.id
+                where txn_insert_dt4.id = txn_insert_dt2.id;
+            """
+            sql """
+                delete from txn_insert_dt2 where id = 1;
+            """
+            sql """
+                delete from txn_insert_dt2 where id = 5;
+            """
+            sql """
+                delete from txn_insert_dt5 partition(p_20000102) where id = 1;
+            """
+            sql """
+                delete from txn_insert_dt5 partition(p_20000102) where id = 5;
+            """
+            sql """ commit """
+            sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
+            sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10, 
'10', 10.0) """
+            sql "sync"
+            order_qt_selectmowd1 """select * from txn_insert_dt1 """
+            order_qt_selectmowd2 """select * from txn_insert_dt2 """
+            order_qt_selectmowd3 """select * from txn_insert_dt4 """
+            order_qt_selectmowd4 """select * from txn_insert_dt5 """
+        }
+
+        // 18. column update(mow table)
         if (use_nereids_planner) {
             def unique_table = "txn_insert_cu"
             for (def i in 0..3) {
@@ -738,4 +758,14 @@ suite("txn_insert") {
             order_qt_select_cu3 """select * from ${unique_table}_3"""
         }
     }
+
+    def db_name = "regression_test_insert_p0"
+    def tables = sql """ show tables from $db_name """
+    logger.info("tables: $tables")
+    for (def table_info : tables) {
+        def table_name = table_info[0]
+        if (table_name.startsWith("txn_insert_")) {
+            check_table_version_continuous(db_name, table_name)
+        }
+    }
 }
diff --git 
a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy 
b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
index 60cffc4d0df..3e84a9f56e8 100644
--- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
@@ -83,14 +83,20 @@ suite("txn_insert_concurrent_insert") {
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
+    def sqls = [
+            "begin",
+            "insert into ${tableName}_0 select * from ${tableName}_1 where 
L_ORDERKEY < 30000;",
+            "insert into ${tableName}_1 select * from ${tableName}_2 where 
L_ORDERKEY > 500000;",
+            "insert into ${tableName}_0 select * from ${tableName}_2 where 
L_ORDERKEY < 30000;",
+            "commit"
+    ]
     def txn_insert = { ->
         try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
              Statement stmt = conn.createStatement()) {
-            stmt.execute("begin")
-            stmt.execute("insert into ${tableName}_0 select * from 
${tableName}_1 where L_ORDERKEY < 30000;")
-            stmt.execute("insert into ${tableName}_1 select * from 
${tableName}_2 where L_ORDERKEY > 500000;")
-            stmt.execute("insert into ${tableName}_0 select * from 
${tableName}_2 where L_ORDERKEY < 30000;")
-            stmt.execute("commit")
+            for (def sql : sqls) {
+                logger.info(Thread.currentThread().getName() + " execute sql: 
" + sql)
+                stmt.execute(sql)
+            }
             logger.info("finish txn insert for " + 
Thread.currentThread().getName())
         } catch (Throwable e) {
             logger.error("txn insert failed", e)
@@ -112,4 +118,14 @@ suite("txn_insert_concurrent_insert") {
     result = sql """ select count() from ${tableName}_1 """
     logger.info("result: ${result}")
     assertEquals(2606192, result[0][0])
+
+    def db_name = "regression_test_insert_p0"
+    def tables = sql """ show tables from $db_name """
+    logger.info("tables: $tables")
+    for (def table_info : tables) {
+        def table_name = table_info[0]
+        if (table_name.startsWith(tableName)) {
+            check_table_version_continuous(db_name, table_name)
+        }
+    }
 }
diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy 
b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
index e22c38c70af..083f01b4a8a 100644
--- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
@@ -19,10 +19,12 @@ import com.mysql.cj.jdbc.StatementImpl
 import java.sql.Connection
 import java.sql.DriverManager
 import java.sql.Statement
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
 
 suite("txn_insert_inject_case", "nonConcurrent") {
+    // test load fail
     def table = "txn_insert_inject_case"
-
     for (int j = 0; j < 3; j++) {
         def tableName = table + "_" + j
         sql """ DROP TABLE IF EXISTS $tableName """
@@ -37,9 +39,74 @@ suite("txn_insert_inject_case", "nonConcurrent") {
             properties("replication_num" = "1"); 
         """
     }
+    GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
     sql """insert into ${table}_1 values(1, 2.2, "abc", [], []), (2, 3.3, 
"xyz", [1], [1, 0]), (null, null, null, [null], [null, 0])  """
     sql """insert into ${table}_2 values(3, 2.2, "abc", [], []), (4, 3.3, 
"xyz", [1], [1, 0]), (null, null, null, [null], [null, 0])  """
 
+    def ipList = [:]
+    def portList = [:]
+    (ipList, portList) = GetDebugPoint().getBEHostAndHTTPPort()
+    logger.info("be ips: ${ipList}, ports: ${portList}")
+
+    def enableDebugPoint = { ->
+        ipList.each { beid, ip ->
+            DebugPoint.enableDebugPoint(ip, portList[beid] as int, 
NodeType.BE, "FlushToken.submit_flush_error")
+        }
+    }
+
+    def disableDebugPoint = { ->
+        ipList.each { beid, ip ->
+            DebugPoint.disableDebugPoint(ip, portList[beid] as int, 
NodeType.BE, "FlushToken.submit_flush_error")
+        }
+    }
+
+    try {
+        enableDebugPoint()
+        sql """ begin """
+        try {
+            sql """ insert into ${table}_0 select * from ${table}_1; """
+            assertTrue(false, "insert should fail")
+        } catch (Exception e) {
+            logger.info("1" + e.getMessage())
+            
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+        }
+        try {
+            sql """ insert into ${table}_0 select * from ${table}_1; """
+            assertTrue(false, "insert should fail")
+        } catch (Exception e) {
+            logger.info("2" + e.getMessage())
+            
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+        }
+
+        disableDebugPoint()
+        sql """ insert into ${table}_0 select * from ${table}_1; """
+
+        enableDebugPoint()
+        try {
+            sql """ insert into ${table}_0 select * from ${table}_1; """
+            assertTrue(false, "insert should fail")
+        } catch (Exception e) {
+            logger.info("4" + e.getMessage())
+            
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+        }
+
+        disableDebugPoint()
+        sql """ insert into ${table}_0 select * from ${table}_1; """
+        sql """ commit"""
+    } catch (Exception e) {
+        logger.error("failed", e)
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
+    }
+    sql "sync"
+    order_qt_select1 """select * from ${table}_0"""
+
+    if (isCloudMode()) {
+        return
+    }
+
+    sql """ truncate table ${table}_0 """
+
     // 1. publish timeout
     def backendId_to_params = get_be_param("pending_data_expire_time_sec")
     try {
diff --git 
a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy 
b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
index 388342bad53..7481a52e940 100644
--- a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
@@ -30,13 +30,33 @@ suite("txn_insert_with_schema_change") {
     CountDownLatch insertLatch = new CountDownLatch(1)
     CountDownLatch schemaChangeLatch = new CountDownLatch(1)
 
-    def getAlterTableState = { job_state ->
+    for (int j = 0; j < 5; j++) {
+        def tableName = table + "_" + j
+        sql """ DROP TABLE IF EXISTS $tableName """
+        sql """
+            create table $tableName (
+                `ID` int(11) NOT NULL,
+                `NAME` varchar(100) NULL,
+                `score` int(11) NULL
+            ) ENGINE=OLAP
+            duplicate KEY(`id`) 
+            distributed by hash(id) buckets 1
+            properties("replication_num" = "1"); 
+        """
+    }
+    sql """ insert into ${table}_0 values(0, '0', 0) """
+    sql """ insert into ${table}_1 values(0, '0', 0) """
+    sql """ insert into ${table}_2 values(0, '0', 0) """
+    sql """ insert into ${table}_3 values(1, '1', 1), (2, '2', 2) """
+    sql """ insert into ${table}_4 values(3, '3', 3), (4, '4', 4), (5, '5', 5) 
"""
+
+    def getAlterTableState = { tName, job_state ->
         def retry = 0
         sql "use ${dbName};"
         def last_state = ""
         while (true) {
             sleep(2000)
-            def state = sql " show alter table column where tablename = 
'${table}_0' order by CreateTime desc limit 1"
+            def state = sql """ show alter table column where tablename = 
"${tName}" order by CreateTime desc limit 1"""
             logger.info("alter table state: ${state}")
             last_state = state[0][9]
             if (state.size() > 0 && last_state == job_state) {
@@ -53,13 +73,17 @@ suite("txn_insert_with_schema_change") {
     def txnInsert = { sqls ->
         try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
              Statement statement = conn.createStatement()) {
+            logger.info("execute sql: begin")
             statement.execute("begin")
+            logger.info("execute sql: ${sqls[0]}")
             statement.execute(sqls[0])
 
             schemaChangeLatch.countDown()
-            insertLatch.await(2, TimeUnit.MINUTES)
+            insertLatch.await(5, TimeUnit.MINUTES)
 
+            logger.info("execute sql: ${sqls[1]}")
             statement.execute(sqls[1])
+            logger.info("execute sql: commit")
             statement.execute("commit")
         } catch (Throwable e) {
             logger.error("txn insert failed", e)
@@ -67,13 +91,14 @@ suite("txn_insert_with_schema_change") {
         }
     }
 
-    def schemaChange = { sql, job_state ->
+    def schemaChange = { sql, tName, job_state ->
         try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
              Statement statement = conn.createStatement()) {
-            schemaChangeLatch.await(2, TimeUnit.MINUTES)
+            schemaChangeLatch.await(5, TimeUnit.MINUTES)
+            logger.info("execute sql: ${sql}")
             statement.execute(sql)
             if (job_state != null) {
-                getAlterTableState(job_state)
+                getAlterTableState(tName, job_state)
             }
             insertLatch.countDown()
         } catch (Throwable e) {
@@ -83,32 +108,20 @@ suite("txn_insert_with_schema_change") {
     }
 
     def sqls = [
-            ["insert into ${table}_0(id, name, score) select * from 
${table}_1;",
-             "insert into ${table}_0(id, name, score) select * from 
${table}_2;"],
-            ["delete from ${table}_0 where id = 0 or id = 3;",
-             "insert into ${table}_0(id, name, score) select * from 
${table}_2;"],
-            ["insert into ${table}_0(id, name, score) select * from 
${table}_2;",
-             "delete from ${table}_0 where id = 0 or id = 3;"]
+            ["insert into ${table}_0(id, name, score) select * from 
${table}_3;",
+             "insert into ${table}_0(id, name, score) select * from 
${table}_4;"],
+            ["delete from ${table}_1 where id = 0 or id = 3;",
+             "insert into ${table}_1(id, name, score) select * from 
${table}_4;"],
+            ["insert into ${table}_2(id, name, score) select * from 
${table}_4;",
+             "delete from ${table}_2 where id = 0 or id = 3;"]
     ]
 
-    for (def insert_sqls: sqls) {
-        for (int j = 0; j < 3; j++) {
-            def tableName = table + "_" + j
-            sql """ DROP TABLE IF EXISTS $tableName force """
-            sql """
-            create table $tableName (
-                `ID` int(11) NOT NULL,
-                `NAME` varchar(100) NULL,
-                `score` int(11) NULL
-            ) ENGINE=OLAP
-            duplicate KEY(`id`) 
-            distributed by hash(id) buckets 1
-            properties("replication_num" = "1"); 
-        """
+    for (int i = 0; i < sqls.size(); i++) {
+        def insert_sqls = sqls[i]
+        // TODO skip because it will cause ms core
+        if (isCloudMode() && insert_sqls[1].startsWith("delete")) {
+            continue
         }
-        sql """ insert into ${table}_0 values(0, '0', 0) """
-        sql """ insert into ${table}_1 values(1, '1', 1), (2, '2', 2) """
-        sql """ insert into ${table}_2 values(3, '3', 3), (4, '4', 4), (5, 
'5', 5) """
 
         logger.info("insert sqls: ${insert_sqls}")
         // 1. do light weight schema change: add column
@@ -116,7 +129,7 @@ suite("txn_insert_with_schema_change") {
             insertLatch = new CountDownLatch(1)
             schemaChangeLatch = new CountDownLatch(1)
             Thread insert_thread = new Thread(() -> txnInsert(insert_sqls))
-            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table}_0 ADD column age int after name;", null))
+            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table}_${i} ADD column age int after name;", "${table}_${i}", null))
             insert_thread.start()
             schema_change_thread.start()
             insert_thread.join()
@@ -124,9 +137,9 @@ suite("txn_insert_with_schema_change") {
 
             logger.info("errors: " + errors)
             assertEquals(0, errors.size())
-            order_qt_select1 """select id, name, score from ${table}_0 """
-            getAlterTableState("FINISHED")
-            order_qt_select2 """select id, name, score from ${table}_0 """
+            order_qt_select1 """select id, name, score from ${table}_${i} """
+            getAlterTableState("${table}_${i}", "FINISHED")
+            order_qt_select2 """select id, name, score from ${table}_${i} """
         }
 
         // 2. do hard weight schema change: change order
@@ -134,7 +147,7 @@ suite("txn_insert_with_schema_change") {
             insertLatch = new CountDownLatch(1)
             schemaChangeLatch = new CountDownLatch(1)
             Thread insert_thread = new Thread(() -> txnInsert(insert_sqls))
-            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table}_0 order by (id, score, age, name);", "WAITING_TXN"))
+            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table}_${i} order by (id, score, age, name);", "${table}_${i}", 
"WAITING_TXN"))
             insert_thread.start()
             schema_change_thread.start()
             insert_thread.join()
@@ -142,9 +155,10 @@ suite("txn_insert_with_schema_change") {
 
             logger.info("errors: " + errors)
             assertEquals(0, errors.size())
-            order_qt_select3 """select id, name, score from ${table}_0 """
-            getAlterTableState("FINISHED")
-            order_qt_select4 """select id, name, score from ${table}_0 """
+            order_qt_select3 """select id, name, score from ${table}_${i} """
+            getAlterTableState("${table}_${i}", "FINISHED")
+            order_qt_select4 """select id, name, score from ${table}_${i} """
         }
+        check_table_version_continuous(dbName, table + "_" + i)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to