This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d4c0a9f6f5 [feat](cloud) commit_txn supports versioned read (#54643)
4d4c0a9f6f5 is described below

commit 4d4c0a9f6f56495a1c194c6bf49ce543dd2221e6
Author: walter <[email protected]>
AuthorDate: Wed Aug 13 20:11:08 2025 +0800

    [feat](cloud) commit_txn supports versioned read (#54643)
---
 cloud/src/meta-service/meta_service_txn.cpp     | 148 ++++++++---
 cloud/src/meta-service/txn_lazy_committer.cpp   |  77 +++++-
 cloud/src/meta-store/meta_reader.cpp            |  47 ++++
 cloud/src/meta-store/meta_reader.h              |  25 ++
 cloud/test/meta_reader_test.cpp                 | 296 ++++++++++++++++++++++
 cloud/test/meta_service_test.cpp                |   5 +-
 cloud/test/meta_service_versioned_read_test.cpp | 312 ++++++++++++++++++++++++
 cloud/test/txn_lazy_commit_test.cpp             | 123 ++++++++++
 8 files changed, 988 insertions(+), 45 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 07049ca9639..83f1bf909df 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -34,6 +34,7 @@
 #include "meta-service/meta_service_tablet_stats.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
 #include "meta-store/txn_kv.h"
 #include "meta-store/txn_kv_error.h"
 #include "meta-store/versioned_value.h"
@@ -1143,6 +1144,11 @@ void MetaServiceImpl::commit_txn_immediately(
         TxnErrorCode& err, KVStats& stats) {
     std::stringstream ss;
     int64_t txn_id = request->txn_id();
+
+    bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
+    MetaReader meta_reader(instance_id, txn_kv_.get());
+
     do {
         TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
         int64_t last_pending_txn_id = 0;
@@ -1234,16 +1240,24 @@ void MetaServiceImpl::commit_txn_immediately(
         // Prepare rowset meta and new_versions
         AnnotateTag txn_tag("txn_id", txn_id);
         std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
-        {
-            auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
-                    std::ranges::ref_view(tmp_rowsets_meta) |
-                    std::ranges::views::transform(
-                            [](const auto& pair) { return 
pair.second.tablet_id(); }));
+        auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
+                std::ranges::ref_view(tmp_rowsets_meta) |
+                std::ranges::views::transform(
+                        [](const auto& pair) { return pair.second.tablet_id(); 
}));
+        if (!is_versioned_read) {
             std::tie(code, msg) =
                     get_tablet_indexes(txn.get(), &tablet_ids, instance_id, 
acquired_tablet_ids);
             if (code != MetaServiceCode::OK) {
                 return;
             }
+        } else {
+            err = meta_reader.get_tablet_indexes(txn.get(), 
acquired_tablet_ids, &tablet_ids);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get tablet indexes, err={}", err);
+                LOG_WARNING(msg);
+                return;
+            }
         }
 
         std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> 
partition_indexes;
@@ -1256,10 +1270,23 @@ void MetaServiceImpl::commit_txn_immediately(
 
         // {table/partition} -> version
         std::unordered_map<int64_t, int64_t> versions;
-        std::tie(code, msg) = get_partition_versions(txn.get(), &versions, 
&last_pending_txn_id,
-                                                     instance_id, 
partition_indexes);
-        if (code != MetaServiceCode::OK) {
-            return;
+        if (!is_versioned_read) {
+            std::tie(code, msg) = get_partition_versions(txn.get(), &versions, 
&last_pending_txn_id,
+                                                         instance_id, 
partition_indexes);
+            if (code != MetaServiceCode::OK) {
+                return;
+            }
+        } else {
+            std::vector<int64_t> partition_ids = 
to_container<std::vector<int64_t>>(
+                    std::ranges::ref_view(partition_indexes) | 
std::ranges::views::keys);
+            err = meta_reader.get_partition_versions(txn.get(), partition_ids, 
&versions,
+                                                     &last_pending_txn_id);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get partition versions, err={}", 
err);
+                LOG_WARNING(msg);
+                return;
+            }
         }
 
         if (last_pending_txn_id > 0) {
@@ -1298,6 +1325,10 @@ void MetaServiceImpl::commit_txn_immediately(
                 ss << "failed to get partition version key, the target version 
not exists in "
                       "versions."
                    << " txn_id=" << txn_id << " partition_id=" << partition_id;
+                ss << " versions";
+                for (const auto& [pid, ver] : versions) {
+                    ss << " partition_id=" << pid << " version=" << ver;
+                }
                 msg = ss.str();
                 LOG(ERROR) << msg;
                 return;
@@ -1334,9 +1365,6 @@ void MetaServiceImpl::commit_txn_immediately(
             return;
         }
 
-        bool is_versioned_write = is_version_write_enabled(instance_id);
-        bool is_versioned_read = is_version_read_enabled(instance_id);
-
         // Save rowset meta
         for (auto& i : rowsets) {
             auto [tablet_id, version] = i.first;
@@ -1679,6 +1707,10 @@ void MetaServiceImpl::commit_txn_eventually(
     TxnErrorCode err = TxnErrorCode::TXN_OK;
     int64_t txn_id = request->txn_id();
 
+    bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
+    MetaReader meta_reader(instance_id, txn_kv_.get());
+
     do {
         TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
         int64_t last_pending_txn_id = 0;
@@ -1705,16 +1737,24 @@ void MetaServiceImpl::commit_txn_eventually(
 
         // tablet_id -> {table/index/partition}_id
         std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
-        {
-            auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
-                    std::ranges::ref_view(tmp_rowsets_meta) |
-                    std::ranges::views::transform(
-                            [](const auto& pair) { return 
pair.second.tablet_id(); }));
+        auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
+                std::ranges::ref_view(tmp_rowsets_meta) |
+                std::ranges::views::transform(
+                        [](const auto& pair) { return pair.second.tablet_id(); 
}));
+        if (!is_versioned_read) {
             std::tie(code, msg) =
                     get_tablet_indexes(txn.get(), &tablet_ids, instance_id, 
acquired_tablet_ids);
             if (code != MetaServiceCode::OK) {
                 return;
             }
+        } else {
+            err = meta_reader.get_tablet_indexes(txn.get(), 
acquired_tablet_ids, &tablet_ids);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get tablet indexes, err={}", err);
+                LOG_WARNING(msg);
+                return;
+            }
         }
 
         bool need_repair_tablet_idx =
@@ -1750,10 +1790,23 @@ void MetaServiceImpl::commit_txn_eventually(
         }
 
         std::unordered_map<int64_t, int64_t> versions;
-        std::tie(code, msg) = get_partition_versions(txn.get(), &versions, 
&last_pending_txn_id,
-                                                     instance_id, 
partition_indexes);
-        if (code != MetaServiceCode::OK) {
-            return;
+        if (!is_versioned_read) {
+            std::tie(code, msg) = get_partition_versions(txn.get(), &versions, 
&last_pending_txn_id,
+                                                         instance_id, 
partition_indexes);
+            if (code != MetaServiceCode::OK) {
+                return;
+            }
+        } else {
+            std::vector<int64_t> partition_ids = 
to_container<std::vector<int64_t>>(
+                    std::ranges::ref_view(partition_indexes) | 
std::ranges::views::keys);
+            err = meta_reader.get_partition_versions(txn.get(), partition_ids, 
&versions,
+                                                     &last_pending_txn_id);
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::READ>(err);
+                msg = fmt::format("failed to get partition versions, err={}", 
err);
+                LOG_WARNING(msg);
+                return;
+            }
         }
 
         TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id",
@@ -1859,8 +1912,6 @@ void MetaServiceImpl::commit_txn_eventually(
         // lazy commit task will advance txn to make txn visible
         txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
 
-        bool is_versioned_write = is_version_write_enabled(instance_id);
-        bool is_versioned_read = is_version_read_enabled(instance_id);
         txn_info.set_versioned_write(is_versioned_write);
         txn_info.set_versioned_read(is_versioned_read);
 
@@ -2232,21 +2283,34 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
 
     AnnotateTag txn_tag("txn_id", txn_id);
 
+    bool is_versioned_write = is_version_write_enabled(instance_id);
+    bool is_versioned_read = is_version_read_enabled(instance_id);
+    MetaReader meta_reader(instance_id, txn_kv_.get());
+
     // Prepare rowset meta and new_versions
     std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
-    {
-        // Read tablet indexes in batch.
-        std::vector<int64_t> acquired_tablet_ids;
-        for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
-            for (const auto& [_, i] : tmp_rowsets_meta) {
-                acquired_tablet_ids.push_back(i.tablet_id());
-            }
+    std::vector<int64_t> acquired_tablet_ids;
+    for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+        for (const auto& [_, i] : tmp_rowsets_meta) {
+            acquired_tablet_ids.push_back(i.tablet_id());
         }
+    }
+    if (!is_versioned_read) {
+        // Read tablet indexes in batch.
         std::tie(code, msg) =
                 get_tablet_indexes(txn.get(), &tablet_ids, instance_id, 
acquired_tablet_ids);
         if (code != MetaServiceCode::OK) {
             return;
         }
+    } else {
+        TxnErrorCode err =
+                meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, 
&tablet_ids);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get tablet indexes, err={}", err);
+            LOG_WARNING(msg);
+            return;
+        }
     }
 
     // {table/partition} -> version
@@ -2260,10 +2324,23 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
 
     // FIXME: handle pengding txn id in commit_txn_with_sub_txn.
     int64_t last_pending_txn_id = 0;
-    std::tie(code, msg) = get_partition_versions(txn.get(), &new_versions, 
&last_pending_txn_id,
-                                                 instance_id, 
partition_indexes);
-    if (code != MetaServiceCode::OK) {
-        return;
+    if (!is_versioned_read) {
+        std::tie(code, msg) = get_partition_versions(txn.get(), &new_versions, 
&last_pending_txn_id,
+                                                     instance_id, 
partition_indexes);
+        if (code != MetaServiceCode::OK) {
+            return;
+        }
+    } else {
+        std::vector<int64_t> partition_ids = 
to_container<std::vector<int64_t>>(
+                std::ranges::ref_view(partition_indexes) | 
std::ranges::views::keys);
+        err = meta_reader.get_partition_versions(txn.get(), partition_ids, 
&new_versions,
+                                                 &last_pending_txn_id);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get partition versions, err={}", err);
+            LOG_WARNING(msg);
+            return;
+        }
     }
 
     CommitTxnLogPB commit_txn_log;
@@ -2333,9 +2410,6 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         return;
     }
 
-    bool is_versioned_write = is_version_write_enabled(instance_id);
-    bool is_versioned_read = is_version_read_enabled(instance_id);
-
     // Save rowset meta
     for (auto& i : rowsets) {
         auto [tablet_id, version] = i.first;
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index ae5671bfb4a..9ef38ec9951 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -30,6 +30,8 @@
 #include "meta-service/meta_service_tablet_stats.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
+#include "meta-store/txn_kv_error.h"
 #include "meta-store/versioned_value.h"
 
 using namespace std::chrono;
@@ -137,6 +139,8 @@ void convert_tmp_rowsets(
         return;
     }
 
+    MetaReader meta_reader(instance_id, txn_kv.get());
+
     // partition_id -> VersionPB
     std::unordered_map<int64_t, VersionPB> partition_versions;
     // tablet_id -> stats
@@ -186,7 +190,18 @@ void convert_tmp_rowsets(
                     return;
                 }
             } else {
-                CHECK(false) << "versioned read is not supported yet";
+                err = meta_reader.get_tablet_index(txn.get(), 
tmp_rowset_pb.tablet_id(),
+                                                   &tablet_idx_pb);
+                if (err != TxnErrorCode::TXN_OK) {
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get tablet index, txn_id=" << txn_id
+                       << " tablet_id=" << tmp_rowset_pb.tablet_id() << " 
err=" << err;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
             }
             tablet_ids.emplace(tmp_rowset_pb.tablet_id(), tablet_idx_pb);
         }
@@ -219,7 +234,20 @@ void convert_tmp_rowsets(
                 LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
                           << " version_pb:" << version_pb.ShortDebugString();
             } else {
-                CHECK(false) << "versioned read is not supported yet";
+                err = meta_reader.get_partition_version(txn.get(), 
tmp_rowset_pb.partition_id(),
+                                                        &version_pb, nullptr);
+                if (err != TxnErrorCode::TXN_OK) {
+                    code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                   ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                   : cast_as<ErrCategory::READ>(err);
+                    ss << "failed to get partition version, txn_id=" << txn_id
+                       << " partition_id=" << tmp_rowset_pb.partition_id() << 
" err=" << err;
+                    msg = ss.str();
+                    LOG(WARNING) << msg;
+                    return;
+                }
+                LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << 
tmp_rowset_pb.partition_id()
+                          << " version_pb:" << version_pb.ShortDebugString();
             }
             partition_versions.emplace(tmp_rowset_pb.partition_id(), 
version_pb);
             DCHECK_EQ(partition_versions.size(), 1) << 
partition_versions.size();
@@ -249,7 +277,22 @@ void convert_tmp_rowsets(
             }
             DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
         } else {
-            CHECK(true) << "versioned read is not supported yet";
+            int64_t tablet_id = tmp_rowset_pb.tablet_id();
+            RowsetMetaCloudPB rowset_val_pb;
+            err = meta_reader.get_load_rowset_meta(txn.get(), tablet_id, 
version, &rowset_val_pb);
+            if (TxnErrorCode::TXN_OK == err) {
+                // tmp rowset key has been converted
+                continue;
+            }
+            if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get load_rowset_key, txn_id=" << txn_id
+                   << " tablet_id=" << tablet_id << " version=" << version << 
" err=" << err;
+                msg = ss.str();
+                LOG(WARNING) << msg;
+                return;
+            }
+            DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
         }
 
         tmp_rowset_pb.set_start_version(version);
@@ -471,6 +514,7 @@ void TxnLazyCommitTask::commit() {
 
     bool is_versioned_write = txn_info.versioned_write();
     bool is_versioned_read = txn_info.versioned_read();
+    MetaReader meta_reader(instance_id_, txn_kv_.get());
 
     std::stringstream ss;
     int retry_times = 0;
@@ -589,7 +633,20 @@ void TxnLazyCommitTask::commit() {
                         }
                         table_id = tablet_idx_pb.table_id();
                     } else {
-                        CHECK(false) << "versioned read is not supported yet";
+                        TabletIndexPB tablet_idx_pb;
+                        int64_t first_tablet_id = 
tmp_rowset_metas.begin()->second.tablet_id();
+                        err = meta_reader.get_tablet_index(first_tablet_id, 
&tablet_idx_pb);
+                        if (err != TxnErrorCode::TXN_OK) {
+                            code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                            ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                            : cast_as<ErrCategory::READ>(err);
+                            ss << "failed to get tablet index, txn_id=" << 
txn_id_
+                               << " tablet_id=" << first_tablet_id << " err=" 
<< err;
+                            msg_ = ss.str();
+                            LOG(WARNING) << msg_;
+                            break;
+                        }
+                        table_id = tablet_idx_pb.table_id();
                     }
                 }
 
@@ -620,7 +677,17 @@ void TxnLazyCommitTask::commit() {
                         break;
                     }
                 } else {
-                    CHECK(false) << "versioned read is not supported yet";
+                    err = meta_reader.get_partition_version(partition_id, 
&version_pb, nullptr);
+                    if (TxnErrorCode::TXN_OK != err) {
+                        code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+                                        ? MetaServiceCode::TXN_ID_NOT_FOUND
+                                        : cast_as<ErrCategory::READ>(err);
+                        ss << "failed to get versioned partiton version, 
txn_id=" << txn_id_
+                           << " partition_id=" << partition_id << " err=" << 
err;
+                        msg_ = ss.str();
+                        LOG(WARNING) << msg_;
+                        break;
+                    }
                 }
 
                 if (version_pb.pending_txn_ids_size() > 0 &&
diff --git a/cloud/src/meta-store/meta_reader.cpp 
b/cloud/src/meta-store/meta_reader.cpp
index d1b32a23a30..85123dd9e0c 100644
--- a/cloud/src/meta-store/meta_reader.cpp
+++ b/cloud/src/meta-store/meta_reader.cpp
@@ -363,6 +363,34 @@ TxnErrorCode MetaReader::get_partition_versions(
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode MetaReader::get_partition_versions(Transaction* txn,
+                                                const std::vector<int64_t>& 
partition_ids,
+                                                std::unordered_map<int64_t, 
int64_t>* versions,
+                                                int64_t* last_pending_txn_id, 
bool snapshot) {
+    std::unordered_map<int64_t, VersionPB> version_pb_map;
+    TxnErrorCode err =
+            get_partition_versions(txn, partition_ids, &version_pb_map, 
nullptr, snapshot);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+
+    for (int64_t partition_id : partition_ids) {
+        auto it = version_pb_map.find(partition_id);
+        if (it == version_pb_map.end()) {
+            versions->emplace(partition_id, 1);
+        } else {
+            const VersionPB& version_pb = it->second;
+            int64_t version = version_pb.version();
+            versions->emplace(partition_id, version);
+            if (last_pending_txn_id && version_pb.pending_txn_ids_size() > 0) {
+                *last_pending_txn_id = version_pb.pending_txn_ids(0);
+            }
+        }
+    }
+
+    return TxnErrorCode::TXN_OK;
+}
+
 TxnErrorCode MetaReader::get_rowset_metas(int64_t tablet_id, int64_t 
start_version,
                                           int64_t end_version,
                                           std::vector<RowsetMetaCloudPB>* 
rowset_metas,
@@ -470,6 +498,25 @@ TxnErrorCode MetaReader::get_rowset_metas(Transaction* 
txn, int64_t tablet_id,
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode MetaReader::get_load_rowset_meta(int64_t tablet_id, int64_t 
version,
+                                              RowsetMetaCloudPB* rowset_meta, 
bool snapshot) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+    return get_load_rowset_meta(txn.get(), tablet_id, version, rowset_meta, 
snapshot);
+}
+
+TxnErrorCode MetaReader::get_load_rowset_meta(Transaction* txn, int64_t 
tablet_id, int64_t version,
+                                              RowsetMetaCloudPB* rowset_meta, 
bool snapshot) {
+    std::string load_rowset_key =
+            versioned::meta_rowset_load_key({instance_id_, tablet_id, 
version});
+    Versionstamp versionstamp;
+    return versioned::document_get(txn, load_rowset_key, snapshot_version_, 
rowset_meta,
+                                   &versionstamp, snapshot);
+}
+
 TxnErrorCode MetaReader::get_tablet_indexes(
         const std::vector<int64_t>& tablet_ids,
         std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes, bool 
snapshot) {
diff --git a/cloud/src/meta-store/meta_reader.h 
b/cloud/src/meta-store/meta_reader.h
index 3ee2e549b76..43a26f1904e 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -125,6 +125,17 @@ public:
         return get_partition_versions(txn, partition_ids, versions, 
versionstamps, snapshot_);
     }
 
+    // Get the partition versions from the given partition_ids.
+    // If the partition id is not found, it will be set to 1 in the versions 
map.
+    TxnErrorCode get_partition_versions(Transaction* txn, const 
std::vector<int64_t>& partition_ids,
+                                        std::unordered_map<int64_t, int64_t>* 
versions,
+                                        int64_t* last_pending_txn_id, bool 
snapshot);
+    TxnErrorCode get_partition_versions(Transaction* txn, const 
std::vector<int64_t>& partition_ids,
+                                        std::unordered_map<int64_t, int64_t>* 
versions,
+                                        int64_t* last_pending_txn_id) {
+        return get_partition_versions(txn, partition_ids, versions, 
last_pending_txn_id, snapshot_);
+    }
+
     // Get the tablet load stats for the given tablet
     //
     // If the `tablet_stats` is not nullptr, it will be filled with the 
deserialized TabletStatsPB.
@@ -227,6 +238,20 @@ public:
                                 snapshot_);
     }
 
+    // Get the load rowset meta for the given tablet_id and version.
+    TxnErrorCode get_load_rowset_meta(int64_t tablet_id, int64_t version,
+                                      RowsetMetaCloudPB* rowset_meta, bool 
snapshot);
+    TxnErrorCode get_load_rowset_meta(Transaction* txn, int64_t tablet_id, 
int64_t version,
+                                      RowsetMetaCloudPB* rowset_meta, bool 
snapshot);
+    TxnErrorCode get_load_rowset_meta(int64_t tablet_id, int64_t version,
+                                      RowsetMetaCloudPB* rowset_meta) {
+        return get_load_rowset_meta(tablet_id, version, rowset_meta, 
snapshot_);
+    }
+    TxnErrorCode get_load_rowset_meta(Transaction* txn, int64_t tablet_id, 
int64_t version,
+                                      RowsetMetaCloudPB* rowset_meta) {
+        return get_load_rowset_meta(txn, tablet_id, version, rowset_meta, 
snapshot_);
+    }
+
     // Get the tablet meta keys.
     TxnErrorCode get_tablet_meta(int64_t tablet_id, TabletMetaCloudPB* 
tablet_meta,
                                  Versionstamp* versionstamp, bool snapshot);
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index c30ae9a84bb..926ee3e6bd7 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -390,6 +390,166 @@ TEST(MetaReaderTest, BatchGetPartitionVersion) {
     }
 }
 
+TEST(MetaReaderTest, GetPartitionVersionsWithPendingTxn) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string instance_id = "test_instance";
+    std::vector<int64_t> partition_ids = {2101, 2102, 2103, 2104};
+
+    {
+        // Test empty input
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        std::vector<int64_t> empty_ids;
+        std::unordered_map<int64_t, int64_t> versions;
+        int64_t last_pending_txn_id = -1;
+        TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), 
empty_ids, &versions,
+                                                              
&last_pending_txn_id);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_TRUE(versions.empty());
+        ASSERT_EQ(last_pending_txn_id, -1);
+    }
+
+    {
+        // Put some partition versions (skip partition_ids[1] to test partial 
results)
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        for (size_t i = 0; i < partition_ids.size(); ++i) {
+            if (i == 1) continue; // Skip partition_ids[1]
+            std::string partition_version_key =
+                    versioned::partition_version_key({instance_id, 
partition_ids[i]});
+            VersionPB version_pb;
+            version_pb.set_version(100 + i * 10); // Different versions: 100, 
120, 130
+            // Add pending transaction for partition_ids[2]
+            if (i == 2) {
+                version_pb.add_pending_txn_ids(3001);
+                version_pb.add_pending_txn_ids(3002);
+            }
+            versioned_put(txn.get(), partition_version_key, 
version_pb.SerializeAsString());
+        }
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        // Test partial results - partition not found should be set to 1
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        std::unordered_map<int64_t, int64_t> versions;
+        int64_t last_pending_txn_id = -1;
+        TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), 
partition_ids, &versions,
+                                                              
&last_pending_txn_id);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(versions.size(), 4); // All partition_ids, missing one 
should be set to 1
+
+        for (size_t i = 0; i < partition_ids.size(); ++i) {
+            int64_t partition_id = partition_ids[i];
+            ASSERT_NE(versions.find(partition_id), versions.end());
+            if (i == 1) {
+                // Missing partition should be set to 1
+                ASSERT_EQ(versions[partition_id], 1);
+            } else {
+                ASSERT_EQ(versions[partition_id], 100 + i * 10);
+            }
+        }
+        // Should return first pending transaction ID from partition_ids[2]
+        ASSERT_EQ(last_pending_txn_id, 3001);
+    }
+
+    {
+        // Put the missing partition version without pending transaction
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string partition_version_key =
+                versioned::partition_version_key({instance_id, 
partition_ids[1]});
+        VersionPB version_pb;
+        version_pb.set_version(110);
+        versioned_put(txn.get(), partition_version_key, 
version_pb.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        // Test all keys found with no pending transactions
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        std::unordered_map<int64_t, int64_t> versions;
+        int64_t last_pending_txn_id = -1;
+        TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), 
partition_ids, &versions,
+                                                              
&last_pending_txn_id);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(versions.size(), partition_ids.size());
+
+        for (size_t i = 0; i < partition_ids.size(); ++i) {
+            int64_t partition_id = partition_ids[i];
+            ASSERT_NE(versions.find(partition_id), versions.end());
+            int32_t expected_version = (i == 1) ? 110 : 100 + i * 10;
+            ASSERT_EQ(versions[partition_id], expected_version);
+        }
+        // Should still return first pending transaction ID from 
partition_ids[2]
+        ASSERT_EQ(last_pending_txn_id, 3001);
+    }
+
+    {
+        // Remove pending transactions from partition_ids[2]
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string partition_version_key =
+                versioned::partition_version_key({instance_id, 
partition_ids[2]});
+        VersionPB version_pb;
+        version_pb.set_version(120);
+        // No pending transactions this time
+        versioned_put(txn.get(), partition_version_key, 
version_pb.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        // Test with no pending transactions
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        std::unordered_map<int64_t, int64_t> versions;
+        int64_t last_pending_txn_id = -1;
+        TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), 
partition_ids, &versions,
+                                                              
&last_pending_txn_id);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(versions.size(), partition_ids.size());
+
+        for (size_t i = 0; i < partition_ids.size(); ++i) {
+            int64_t partition_id = partition_ids[i];
+            ASSERT_NE(versions.find(partition_id), versions.end());
+            int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 + 
i * 10;
+            ASSERT_EQ(versions[partition_id], expected_version);
+        }
+        // No pending transactions, so last_pending_txn_id should remain -1
+        ASSERT_EQ(last_pending_txn_id, -1);
+    }
+
+    {
+        // Test using the convenience method (without snapshot parameter)
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        std::unordered_map<int64_t, int64_t> versions;
+        int64_t last_pending_txn_id = -1;
+        TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), 
partition_ids, &versions,
+                                                              
&last_pending_txn_id);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(versions.size(), partition_ids.size());
+        // Verify same results as the explicit snapshot=false test
+        for (size_t i = 0; i < partition_ids.size(); ++i) {
+            int64_t partition_id = partition_ids[i];
+            ASSERT_NE(versions.find(partition_id), versions.end());
+            int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 + 
i * 10;
+            ASSERT_EQ(versions[partition_id], expected_version);
+        }
+        ASSERT_EQ(last_pending_txn_id, -1);
+    }
+}
+
 TEST(MetaReaderTest, GetTabletLoadStats) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);
@@ -1300,3 +1460,139 @@ TEST(MetaReaderTest, GetPartitionIndex) {
         ASSERT_EQ(partition_index.table_id(), 200);
     }
 }
+
+TEST(MetaReaderTest, GetLoadRowsetMeta) {
+    using doris::RowsetMetaCloudPB;
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string instance_id = "test_instance";
+    int64_t tablet_id = 5001;
+    int64_t version = 10;
+
+    {
+        // Test key not found when no rowset exists
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+    }
+
+    // Create a load rowset
+    RowsetMetaCloudPB expected_rowset_meta;
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string load_key = versioned::meta_rowset_load_key({instance_id, 
tablet_id, version});
+        expected_rowset_meta.set_rowset_id(0);
+        
expected_rowset_meta.set_rowset_id_v2(fmt::format("test_load_rowset_{}", 
version));
+        expected_rowset_meta.set_start_version(version);
+        expected_rowset_meta.set_end_version(version);
+        expected_rowset_meta.set_num_rows(1000);
+        expected_rowset_meta.set_tablet_id(tablet_id);
+
+        ASSERT_TRUE(versioned::document_put(txn.get(), load_key, 
std::move(expected_rowset_meta)));
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        // Test successful get with created transaction
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+        // Verify fields match
+        ASSERT_EQ(rowset_meta.rowset_id_v2(), 
expected_rowset_meta.rowset_id_v2());
+        ASSERT_EQ(rowset_meta.start_version(), 
expected_rowset_meta.start_version());
+        ASSERT_EQ(rowset_meta.end_version(), 
expected_rowset_meta.end_version());
+        ASSERT_EQ(rowset_meta.num_rows(), expected_rowset_meta.num_rows());
+        ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
+    }
+
+    {
+        // Test successful get with provided transaction
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err =
+                meta_reader.get_load_rowset_meta(txn.get(), tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+        // Verify key fields match
+        ASSERT_EQ(rowset_meta.rowset_id_v2(), 
expected_rowset_meta.rowset_id_v2());
+        ASSERT_EQ(rowset_meta.start_version(), 
expected_rowset_meta.start_version());
+        ASSERT_EQ(rowset_meta.end_version(), 
expected_rowset_meta.end_version());
+        ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
+    }
+
+    // Test with snapshot version functionality
+    Versionstamp snapshot_version;
+    {
+        // Get current snapshot version
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        int64_t version_value = 0;
+        ASSERT_EQ(txn->get_read_version(&version_value), TxnErrorCode::TXN_OK);
+        snapshot_version = Versionstamp(version_value, 1);
+    }
+
+    // Update the rowset
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+        std::string load_key = versioned::meta_rowset_load_key({instance_id, 
tablet_id, version});
+        RowsetMetaCloudPB updated_rowset_meta = expected_rowset_meta;
+        updated_rowset_meta.set_num_rows(2000); // Update row count
+
+        ASSERT_TRUE(versioned::document_put(txn.get(), load_key, 
std::move(updated_rowset_meta)));
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        // Test reading with snapshot version - should get old data
+        MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+        // Should get original values
+        ASSERT_EQ(rowset_meta.num_rows(), 1000);
+    }
+
+    {
+        // Test reading without snapshot - should get new data
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+        // Should get updated values
+        ASSERT_EQ(rowset_meta.num_rows(), 2000);
+    }
+
+    {
+        // Test with snapshot flag
+        MetaReader meta_reader(instance_id, txn_kv.get(), true);
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, 
version, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+        // Should get current values since snapshot flag is set but no 
snapshot version
+        ASSERT_EQ(rowset_meta.num_rows(), 2000);
+    }
+
+    {
+        // Test getting non-existent version
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        RowsetMetaCloudPB rowset_meta;
+        TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version 
+ 1, &rowset_meta);
+        ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+    }
+}
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index dd69a69f4bd..70e758a436a 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -1907,9 +1907,8 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
     }
 }
 
-static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t 
table_id,
-                                     int64_t index_id, int64_t partition_id, 
int64_t tablet_id,
-                                     int64_t txn_id) {
+void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t 
table_id, int64_t index_id,
+                              int64_t partition_id, int64_t tablet_id, int64_t 
txn_id) {
     create_tablet(meta_service, table_id, index_id, partition_id, tablet_id);
     auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id);
     CreateRowsetResponse res;
diff --git a/cloud/test/meta_service_versioned_read_test.cpp 
b/cloud/test/meta_service_versioned_read_test.cpp
index 1638c7927e1..19d7159aa61 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -63,6 +63,9 @@ extern void insert_rowset(MetaServiceProxy* meta_service, 
int64_t db_id, const s
                           int64_t table_id, int64_t partition_id, int64_t 
tablet_id);
 extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t 
index_id,
                        int64_t partition_id, int64_t tablet_id);
+extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t 
table_id,
+                                     int64_t index_id, int64_t partition_id, 
int64_t tablet_id,
+                                     int64_t txn_id);
 
 void insert_compact_rowset(Transaction* txn, std::string instance_id, int64_t 
tablet_id,
                            int64_t partition_id, int64_t start_version, 
int64_t end_version,
@@ -100,6 +103,315 @@ static void create_and_refresh_instance(MetaServiceProxy* 
service, std::string i
     });                                                                        
    \
     SyncPoint::get_instance()->enable_processing();
 
+TEST(MetaServiceVersionedReadTest, CommitTxn) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "test_cloud_instance_id";
+    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t db_id = 666;
+    int64_t table_id = 1234;
+    int64_t index_id = 1235;
+    int64_t partition_id = 1236;
+
+    // case: first version of rowset
+    {
+        int64_t txn_id = -1;
+        // begin txn
+        {
+            brpc::Controller cntl;
+            BeginTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            TxnInfoPB txn_info_pb;
+            txn_info_pb.set_db_id(db_id);
+            txn_info_pb.set_label("test_label");
+            txn_info_pb.add_table_ids(table_id);
+            txn_info_pb.set_timeout_ms(36000);
+            req.mutable_txn_info()->CopyFrom(txn_info_pb);
+            BeginTxnResponse res;
+            
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            txn_id = res.txn_id();
+        }
+
+        // mock rowset and tablet
+        int64_t tablet_id_base = 1103;
+        for (int i = 0; i < 5; ++i) {
+            create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id_base + i);
+            auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, 
partition_id, -1, 100);
+            CreateRowsetResponse res;
+            commit_rowset(meta_service.get(), tmp_rowset, res);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        // precommit txn
+        {
+            brpc::Controller cntl;
+            PrecommitTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_precommit_timeout_ms(36000);
+            PrecommitTxnResponse res;
+            
meta_service->precommit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                        &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        // commit txn
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        // doubly commit txn
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id(cloud_unique_id);
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            auto found = res.status().msg().find(fmt::format(
+                    "transaction is already visible: db_id={} txn_id={}", 
db_id, txn_id));
+            ASSERT_TRUE(found != std::string::npos);
+        }
+
+        // doubly commit txn(2pc)
+        {
+            brpc::Controller cntl;
+            CommitTxnRequest req;
+            req.set_cloud_unique_id("test_cloud_unique_id");
+            req.set_db_id(db_id);
+            req.set_txn_id(txn_id);
+            req.set_is_2pc(true);
+            CommitTxnResponse res;
+            
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                     &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), 
MetaServiceCode::TXN_ALREADY_VISIBLE);
+            auto found = res.status().msg().find(
+                    fmt::format("transaction [{}] is already visible, not 
pre-committed.", txn_id));
+            ASSERT_TRUE(found != std::string::npos);
+        }
+    }
+
+    {
+        // Get the partition versions
+        brpc::Controller cntl;
+        GetVersionRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.set_partition_id(partition_id);
+        GetVersionResponse res;
+        meta_service->get_version(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(res.version(), 2);
+    }
+}
+
+TEST(MetaServiceVersionedReadTest, CommitTxnWithSubTxnTest) {
+    auto meta_service = get_meta_service(false);
+    const std::string instance_id = "test_cloud_instance_id";
+    const std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t db_id = 98131;
+    int64_t txn_id = -1;
+    int64_t t1 = 10;
+    int64_t t1_index = 100;
+    int64_t t1_p1 = 11;
+    int64_t t1_p1_t1 = 12;
+    int64_t t1_p1_t2 = 13;
+    int64_t t1_p2 = 14;
+    int64_t t1_p2_t1 = 15;
+    int64_t t2 = 16;
+    int64_t t2_index = 101;
+    int64_t t2_p3 = 17;
+    int64_t t2_p3_t1 = 18;
+    [[maybe_unused]] int64_t t2_p4 = 19;
+    [[maybe_unused]] int64_t t2_p4_t1 = 20;
+    std::string label = "test_label";
+    std::string label2 = "test_label_0";
+
+    // begin txn
+    {
+        brpc::Controller cntl;
+        BeginTxnRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label(label);
+        txn_info_pb.add_table_ids(t1);
+        txn_info_pb.set_timeout_ms(36000);
+        req.mutable_txn_info()->CopyFrom(txn_info_pb);
+        BeginTxnResponse res;
+        
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        txn_id = res.txn_id();
+    }
+
+    // mock rowset and tablet: for sub_txn1
+    int64_t sub_txn_id1 = txn_id;
+    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, 
t1_p1_t1, sub_txn_id1);
+    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, 
t1_p1_t2, sub_txn_id1);
+    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2, 
t1_p2_t1, sub_txn_id1);
+
+    // begin_sub_txn2
+    int64_t sub_txn_id2 = -1;
+    {
+        brpc::Controller cntl;
+        BeginSubTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_txn_id(txn_id);
+        req.set_sub_txn_num(0);
+        req.set_db_id(db_id);
+        req.set_label(label2);
+        req.mutable_table_ids()->Add(t1);
+        req.mutable_table_ids()->Add(t2);
+        BeginSubTxnResponse res;
+        
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+        ASSERT_TRUE(res.has_sub_txn_id());
+        sub_txn_id2 = res.sub_txn_id();
+        ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+    }
+    // mock rowset and tablet: for sub_txn3
+    create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3, 
t2_p3_t1, sub_txn_id2);
+
+    // begin_sub_txn3
+    int64_t sub_txn_id3 = -1;
+    {
+        brpc::Controller cntl;
+        BeginSubTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_txn_id(txn_id);
+        req.set_sub_txn_num(1);
+        req.set_db_id(db_id);
+        req.set_label("test_label_1");
+        req.mutable_table_ids()->Add(t1);
+        req.mutable_table_ids()->Add(t2);
+        req.mutable_table_ids()->Add(t1);
+        BeginSubTxnResponse res;
+        
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+                                    &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+        ASSERT_TRUE(res.has_sub_txn_id());
+        sub_txn_id3 = res.sub_txn_id();
+        ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+    }
+    // mock rowset and tablet: for sub_txn3
+    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, 
t1_p1_t1, sub_txn_id3);
+    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, 
t1_p1_t2, sub_txn_id3);
+
+    // commit txn
+    CommitTxnRequest req;
+    {
+        brpc::Controller cntl;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(666);
+        req.set_txn_id(txn_id);
+        req.set_is_txn_load(true);
+
+        SubTxnInfo sub_txn_info1;
+        sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+        sub_txn_info1.set_table_id(t1);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
+        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);
+
+        SubTxnInfo sub_txn_info2;
+        sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+        sub_txn_info2.set_table_id(t2);
+        sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);
+
+        SubTxnInfo sub_txn_info3;
+        sub_txn_info3.set_sub_txn_id(sub_txn_id3);
+        sub_txn_info3.set_table_id(t1);
+        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
+        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);
+
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3));
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        // std::cout << res.DebugString() << std::endl;
+        ASSERT_EQ(res.table_ids().size(), 3);
+
+        ASSERT_EQ(res.table_ids()[0], t2);
+        ASSERT_EQ(res.partition_ids()[0], t2_p3);
+        ASSERT_EQ(res.versions()[0], 2);
+
+        ASSERT_EQ(res.table_ids()[1], t1);
+        ASSERT_EQ(res.partition_ids()[1], t1_p2);
+        ASSERT_EQ(res.versions()[1], 2);
+
+        ASSERT_EQ(res.table_ids()[2], t1);
+        ASSERT_EQ(res.partition_ids()[2], t1_p1) << res.ShortDebugString();
+        ASSERT_EQ(res.versions()[2], 3) << res.ShortDebugString();
+    }
+
+    // doubly commit txn
+    {
+        brpc::Controller cntl;
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        auto found = res.status().msg().find(
+                fmt::format("transaction is already visible: db_id={} 
txn_id={}", db_id, txn_id));
+        ASSERT_TRUE(found != std::string::npos);
+    }
+
+    // Verify the partition versions
+    {
+        brpc::Controller ctrl;
+        GetVersionRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_batch_mode(true);
+        req.add_db_ids(db_id);
+        req.add_db_ids(db_id);
+        req.add_db_ids(db_id);
+        req.add_table_ids(t1);
+        req.add_table_ids(t1);
+        req.add_table_ids(t2);
+        req.add_partition_ids(t1_p1);
+        req.add_partition_ids(t1_p2);
+        req.add_partition_ids(t2_p3);
+
+        GetVersionResponse resp;
+        meta_service->get_version(&ctrl, &req, &resp, nullptr);
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+                << " status is " << resp.status().ShortDebugString();
+        ASSERT_EQ(resp.versions().size(), 3);
+        ASSERT_EQ(resp.versions()[0], 3); // t1_p1
+        ASSERT_EQ(resp.versions()[1], 2); // t1_p2
+        ASSERT_EQ(resp.versions()[2], 2); // t2_p3
+    }
+}
+
 TEST(MetaServiceVersionedReadTest, GetVersion) {
     auto service = get_meta_service(false);
 
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 325d1146032..0b6b04752d4 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -343,6 +343,21 @@ static void 
check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id
     ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
 }
 
+// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
+static void create_and_refresh_instance(MetaServiceProxy* service, std::string 
instance_id) {
+    // write instance
+    InstanceInfoPB instance_info;
+    instance_info.set_instance_id(instance_id);
+    instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    service->resource_mgr()->refresh_instance(instance_id);
+    
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
+}
+
 TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
     auto txn_kv = get_mem_txn_kv();
     auto meta_service = get_meta_service(txn_kv, true);
@@ -743,6 +758,114 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
     sp->disable_processing();
 }
 
+TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
+    auto txn_kv = get_mem_txn_kv();
+    int64_t db_id = 7651485414;
+    int64_t table_id = 31478952181;
+    int64_t index_id = 89894141;
+    int64_t partition_id = 1241241;
+    bool commit_txn_eventually_finish_hit = false;
+    bool last_pending_txn_id_hit = false;
+    int repair_tablet_idx_count = 0;
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", 
[&](auto&& args) {
+        bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]);
+        ASSERT_FALSE(need_repair_tablet_idx);
+        repair_tablet_idx_count++;
+    });
+
+    sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& 
args) {
+        int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+        ASSERT_EQ(last_pending_txn_id, 0);
+        last_pending_txn_id_hit = true;
+    });
+
+    sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+        MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        commit_txn_eventually_finish_hit = true;
+    });
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, false);
+    std::string instance_id = "test_instance";
+    std::string cloud_unique_id = "1:test_instance:1";
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& 
args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    SyncPoint::get_instance()->enable_processing();
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int txn_id = 0;
+    {
+        // Begin transaction
+        brpc::Controller cntl;
+        BeginTxnRequest req;
+        req.set_cloud_unique_id(cloud_unique_id);
+        TxnInfoPB txn_info_pb;
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label("test_label_commit_txn_eventually2");
+        txn_info_pb.add_table_ids(table_id);
+        txn_info_pb.set_timeout_ms(36000);
+        req.mutable_txn_info()->CopyFrom(txn_info_pb);
+        BeginTxnResponse res;
+        
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        txn_id = res.txn_id();
+    }
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 3131124;
+    for (int i = 0; i < 5; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+        auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, 
partition_id);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    {
+        brpc::Controller cntl;
+        CommitTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(db_id);
+        req.set_txn_id(txn_id);
+        req.set_is_2pc(false);
+        req.set_enable_txn_lazy_commit(true);
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_GE(repair_tablet_idx_count, 0);
+        ASSERT_TRUE(last_pending_txn_id_hit);
+        ASSERT_TRUE(commit_txn_eventually_finish_hit);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string mock_instance = "test_instance";
+        for (int i = 0; i < 5; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+        }
+    }
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+}
+
 TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
     auto txn_kv = get_mem_txn_kv();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to