This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4d4c0a9f6f5 [feat](cloud) commit_txn supports versioned read (#54643)
4d4c0a9f6f5 is described below
commit 4d4c0a9f6f56495a1c194c6bf49ce543dd2221e6
Author: walter <[email protected]>
AuthorDate: Wed Aug 13 20:11:08 2025 +0800
[feat](cloud) commit_txn supports versioned read (#54643)
---
cloud/src/meta-service/meta_service_txn.cpp | 148 ++++++++---
cloud/src/meta-service/txn_lazy_committer.cpp | 77 +++++-
cloud/src/meta-store/meta_reader.cpp | 47 ++++
cloud/src/meta-store/meta_reader.h | 25 ++
cloud/test/meta_reader_test.cpp | 296 ++++++++++++++++++++++
cloud/test/meta_service_test.cpp | 5 +-
cloud/test/meta_service_versioned_read_test.cpp | 312 ++++++++++++++++++++++++
cloud/test/txn_lazy_commit_test.cpp | 123 ++++++++++
8 files changed, 988 insertions(+), 45 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 07049ca9639..83f1bf909df 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -34,6 +34,7 @@
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
@@ -1143,6 +1144,11 @@ void MetaServiceImpl::commit_txn_immediately(
TxnErrorCode& err, KVStats& stats) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
+
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ MetaReader meta_reader(instance_id, txn_kv_.get());
+
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
int64_t last_pending_txn_id = 0;
@@ -1234,16 +1240,24 @@ void MetaServiceImpl::commit_txn_immediately(
// Prepare rowset meta and new_versions
AnnotateTag txn_tag("txn_id", txn_id);
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
- {
- auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
- std::ranges::ref_view(tmp_rowsets_meta) |
- std::ranges::views::transform(
- [](const auto& pair) { return
pair.second.tablet_id(); }));
+ auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
+ std::ranges::ref_view(tmp_rowsets_meta) |
+ std::ranges::views::transform(
+ [](const auto& pair) { return pair.second.tablet_id();
}));
+ if (!is_versioned_read) {
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id,
acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
+ } else {
+ err = meta_reader.get_tablet_indexes(txn.get(),
acquired_tablet_ids, &tablet_ids);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet indexes, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
}
std::unordered_map<int64_t, std::tuple<int64_t, int64_t>>
partition_indexes;
@@ -1256,10 +1270,23 @@ void MetaServiceImpl::commit_txn_immediately(
// {table/partition} -> version
std::unordered_map<int64_t, int64_t> versions;
- std::tie(code, msg) = get_partition_versions(txn.get(), &versions,
&last_pending_txn_id,
- instance_id,
partition_indexes);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ std::tie(code, msg) = get_partition_versions(txn.get(), &versions,
&last_pending_txn_id,
+ instance_id,
partition_indexes);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ std::vector<int64_t> partition_ids =
to_container<std::vector<int64_t>>(
+ std::ranges::ref_view(partition_indexes) |
std::ranges::views::keys);
+ err = meta_reader.get_partition_versions(txn.get(), partition_ids,
&versions,
+ &last_pending_txn_id);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition versions, err={}",
err);
+ LOG_WARNING(msg);
+ return;
+ }
}
if (last_pending_txn_id > 0) {
@@ -1298,6 +1325,10 @@ void MetaServiceImpl::commit_txn_immediately(
ss << "failed to get partition version key, the target version
not exists in "
"versions."
<< " txn_id=" << txn_id << " partition_id=" << partition_id;
+ ss << " versions";
+ for (const auto& [pid, ver] : versions) {
+ ss << " partition_id=" << pid << " version=" << ver;
+ }
msg = ss.str();
LOG(ERROR) << msg;
return;
@@ -1334,9 +1365,6 @@ void MetaServiceImpl::commit_txn_immediately(
return;
}
- bool is_versioned_write = is_version_write_enabled(instance_id);
- bool is_versioned_read = is_version_read_enabled(instance_id);
-
// Save rowset meta
for (auto& i : rowsets) {
auto [tablet_id, version] = i.first;
@@ -1679,6 +1707,10 @@ void MetaServiceImpl::commit_txn_eventually(
TxnErrorCode err = TxnErrorCode::TXN_OK;
int64_t txn_id = request->txn_id();
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ MetaReader meta_reader(instance_id, txn_kv_.get());
+
do {
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
int64_t last_pending_txn_id = 0;
@@ -1705,16 +1737,24 @@ void MetaServiceImpl::commit_txn_eventually(
// tablet_id -> {table/index/partition}_id
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
- {
- auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
- std::ranges::ref_view(tmp_rowsets_meta) |
- std::ranges::views::transform(
- [](const auto& pair) { return
pair.second.tablet_id(); }));
+ auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
+ std::ranges::ref_view(tmp_rowsets_meta) |
+ std::ranges::views::transform(
+ [](const auto& pair) { return pair.second.tablet_id();
}));
+ if (!is_versioned_read) {
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id,
acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
+ } else {
+ err = meta_reader.get_tablet_indexes(txn.get(),
acquired_tablet_ids, &tablet_ids);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet indexes, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
}
bool need_repair_tablet_idx =
@@ -1750,10 +1790,23 @@ void MetaServiceImpl::commit_txn_eventually(
}
std::unordered_map<int64_t, int64_t> versions;
- std::tie(code, msg) = get_partition_versions(txn.get(), &versions,
&last_pending_txn_id,
- instance_id,
partition_indexes);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ std::tie(code, msg) = get_partition_versions(txn.get(), &versions,
&last_pending_txn_id,
+ instance_id,
partition_indexes);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ std::vector<int64_t> partition_ids =
to_container<std::vector<int64_t>>(
+ std::ranges::ref_view(partition_indexes) |
std::ranges::views::keys);
+ err = meta_reader.get_partition_versions(txn.get(), partition_ids,
&versions,
+ &last_pending_txn_id);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition versions, err={}",
err);
+ LOG_WARNING(msg);
+ return;
+ }
}
TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id",
@@ -1859,8 +1912,6 @@ void MetaServiceImpl::commit_txn_eventually(
// lazy commit task will advance txn to make txn visible
txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);
- bool is_versioned_write = is_version_write_enabled(instance_id);
- bool is_versioned_read = is_version_read_enabled(instance_id);
txn_info.set_versioned_write(is_versioned_write);
txn_info.set_versioned_read(is_versioned_read);
@@ -2232,21 +2283,34 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
AnnotateTag txn_tag("txn_id", txn_id);
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+ bool is_versioned_read = is_version_read_enabled(instance_id);
+ MetaReader meta_reader(instance_id, txn_kv_.get());
+
// Prepare rowset meta and new_versions
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
- {
- // Read tablet indexes in batch.
- std::vector<int64_t> acquired_tablet_ids;
- for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
- for (const auto& [_, i] : tmp_rowsets_meta) {
- acquired_tablet_ids.push_back(i.tablet_id());
- }
+ std::vector<int64_t> acquired_tablet_ids;
+ for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+ for (const auto& [_, i] : tmp_rowsets_meta) {
+ acquired_tablet_ids.push_back(i.tablet_id());
}
+ }
+ if (!is_versioned_read) {
+ // Read tablet indexes in batch.
std::tie(code, msg) =
get_tablet_indexes(txn.get(), &tablet_ids, instance_id,
acquired_tablet_ids);
if (code != MetaServiceCode::OK) {
return;
}
+ } else {
+ TxnErrorCode err =
+ meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids,
&tablet_ids);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet indexes, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
}
// {table/partition} -> version
@@ -2260,10 +2324,23 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
// FIXME: handle pengding txn id in commit_txn_with_sub_txn.
int64_t last_pending_txn_id = 0;
- std::tie(code, msg) = get_partition_versions(txn.get(), &new_versions,
&last_pending_txn_id,
- instance_id,
partition_indexes);
- if (code != MetaServiceCode::OK) {
- return;
+ if (!is_versioned_read) {
+ std::tie(code, msg) = get_partition_versions(txn.get(), &new_versions,
&last_pending_txn_id,
+ instance_id,
partition_indexes);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ } else {
+ std::vector<int64_t> partition_ids =
to_container<std::vector<int64_t>>(
+ std::ranges::ref_view(partition_indexes) |
std::ranges::views::keys);
+ err = meta_reader.get_partition_versions(txn.get(), partition_ids,
&new_versions,
+ &last_pending_txn_id);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition versions, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
}
CommitTxnLogPB commit_txn_log;
@@ -2333,9 +2410,6 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
return;
}
- bool is_versioned_write = is_version_write_enabled(instance_id);
- bool is_versioned_read = is_version_read_enabled(instance_id);
-
// Save rowset meta
for (auto& i : rowsets) {
auto [tablet_id, version] = i.first;
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index ae5671bfb4a..9ef38ec9951 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -30,6 +30,8 @@
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
+#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
using namespace std::chrono;
@@ -137,6 +139,8 @@ void convert_tmp_rowsets(
return;
}
+ MetaReader meta_reader(instance_id, txn_kv.get());
+
// partition_id -> VersionPB
std::unordered_map<int64_t, VersionPB> partition_versions;
// tablet_id -> stats
@@ -186,7 +190,18 @@ void convert_tmp_rowsets(
return;
}
} else {
- CHECK(false) << "versioned read is not supported yet";
+ err = meta_reader.get_tablet_index(txn.get(),
tmp_rowset_pb.tablet_id(),
+ &tablet_idx_pb);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet index, txn_id=" << txn_id
+ << " tablet_id=" << tmp_rowset_pb.tablet_id() << "
err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
}
tablet_ids.emplace(tmp_rowset_pb.tablet_id(), tablet_idx_pb);
}
@@ -219,7 +234,20 @@ void convert_tmp_rowsets(
LOG(INFO) << "txn_id=" << txn_id << " key=" << hex(ver_key)
<< " version_pb:" << version_pb.ShortDebugString();
} else {
- CHECK(false) << "versioned read is not supported yet";
+ err = meta_reader.get_partition_version(txn.get(),
tmp_rowset_pb.partition_id(),
+ &version_pb, nullptr);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partition version, txn_id=" << txn_id
+ << " partition_id=" << tmp_rowset_pb.partition_id() <<
" err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ LOG(INFO) << "txn_id=" << txn_id << " partition_id=" <<
tmp_rowset_pb.partition_id()
+ << " version_pb:" << version_pb.ShortDebugString();
}
partition_versions.emplace(tmp_rowset_pb.partition_id(),
version_pb);
DCHECK_EQ(partition_versions.size(), 1) <<
partition_versions.size();
@@ -249,7 +277,22 @@ void convert_tmp_rowsets(
}
DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
} else {
- CHECK(true) << "versioned read is not supported yet";
+ int64_t tablet_id = tmp_rowset_pb.tablet_id();
+ RowsetMetaCloudPB rowset_val_pb;
+ err = meta_reader.get_load_rowset_meta(txn.get(), tablet_id,
version, &rowset_val_pb);
+ if (TxnErrorCode::TXN_OK == err) {
+ // tmp rowset key has been converted
+ continue;
+ }
+ if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get load_rowset_key, txn_id=" << txn_id
+ << " tablet_id=" << tablet_id << " version=" << version <<
" err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DCHECK(err == TxnErrorCode::TXN_KEY_NOT_FOUND);
}
tmp_rowset_pb.set_start_version(version);
@@ -471,6 +514,7 @@ void TxnLazyCommitTask::commit() {
bool is_versioned_write = txn_info.versioned_write();
bool is_versioned_read = txn_info.versioned_read();
+ MetaReader meta_reader(instance_id_, txn_kv_.get());
std::stringstream ss;
int retry_times = 0;
@@ -589,7 +633,20 @@ void TxnLazyCommitTask::commit() {
}
table_id = tablet_idx_pb.table_id();
} else {
- CHECK(false) << "versioned read is not supported yet";
+ TabletIndexPB tablet_idx_pb;
+ int64_t first_tablet_id =
tmp_rowset_metas.begin()->second.tablet_id();
+ err = meta_reader.get_tablet_index(first_tablet_id,
&tablet_idx_pb);
+ if (err != TxnErrorCode::TXN_OK) {
+ code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet index, txn_id=" <<
txn_id_
+ << " tablet_id=" << first_tablet_id << " err="
<< err;
+ msg_ = ss.str();
+ LOG(WARNING) << msg_;
+ break;
+ }
+ table_id = tablet_idx_pb.table_id();
}
}
@@ -620,7 +677,17 @@ void TxnLazyCommitTask::commit() {
break;
}
} else {
- CHECK(false) << "versioned read is not supported yet";
+ err = meta_reader.get_partition_version(partition_id,
&version_pb, nullptr);
+ if (TxnErrorCode::TXN_OK != err) {
+ code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get versioned partiton version,
txn_id=" << txn_id_
+ << " partition_id=" << partition_id << " err=" <<
err;
+ msg_ = ss.str();
+ LOG(WARNING) << msg_;
+ break;
+ }
}
if (version_pb.pending_txn_ids_size() > 0 &&
diff --git a/cloud/src/meta-store/meta_reader.cpp
b/cloud/src/meta-store/meta_reader.cpp
index d1b32a23a30..85123dd9e0c 100644
--- a/cloud/src/meta-store/meta_reader.cpp
+++ b/cloud/src/meta-store/meta_reader.cpp
@@ -363,6 +363,34 @@ TxnErrorCode MetaReader::get_partition_versions(
return TxnErrorCode::TXN_OK;
}
+TxnErrorCode MetaReader::get_partition_versions(Transaction* txn,
+ const std::vector<int64_t>&
partition_ids,
+ std::unordered_map<int64_t,
int64_t>* versions,
+ int64_t* last_pending_txn_id,
bool snapshot) {
+ std::unordered_map<int64_t, VersionPB> version_pb_map;
+ TxnErrorCode err =
+ get_partition_versions(txn, partition_ids, &version_pb_map,
nullptr, snapshot);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+
+ for (int64_t partition_id : partition_ids) {
+ auto it = version_pb_map.find(partition_id);
+ if (it == version_pb_map.end()) {
+ versions->emplace(partition_id, 1);
+ } else {
+ const VersionPB& version_pb = it->second;
+ int64_t version = version_pb.version();
+ versions->emplace(partition_id, version);
+ if (last_pending_txn_id && version_pb.pending_txn_ids_size() > 0) {
+ *last_pending_txn_id = version_pb.pending_txn_ids(0);
+ }
+ }
+ }
+
+ return TxnErrorCode::TXN_OK;
+}
+
TxnErrorCode MetaReader::get_rowset_metas(int64_t tablet_id, int64_t
start_version,
int64_t end_version,
std::vector<RowsetMetaCloudPB>*
rowset_metas,
@@ -470,6 +498,25 @@ TxnErrorCode MetaReader::get_rowset_metas(Transaction*
txn, int64_t tablet_id,
return TxnErrorCode::TXN_OK;
}
+TxnErrorCode MetaReader::get_load_rowset_meta(int64_t tablet_id, int64_t
version,
+ RowsetMetaCloudPB* rowset_meta,
bool snapshot) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return get_load_rowset_meta(txn.get(), tablet_id, version, rowset_meta,
snapshot);
+}
+
+TxnErrorCode MetaReader::get_load_rowset_meta(Transaction* txn, int64_t
tablet_id, int64_t version,
+ RowsetMetaCloudPB* rowset_meta,
bool snapshot) {
+ std::string load_rowset_key =
+ versioned::meta_rowset_load_key({instance_id_, tablet_id,
version});
+ Versionstamp versionstamp;
+ return versioned::document_get(txn, load_rowset_key, snapshot_version_,
rowset_meta,
+ &versionstamp, snapshot);
+}
+
TxnErrorCode MetaReader::get_tablet_indexes(
const std::vector<int64_t>& tablet_ids,
std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes, bool
snapshot) {
diff --git a/cloud/src/meta-store/meta_reader.h
b/cloud/src/meta-store/meta_reader.h
index 3ee2e549b76..43a26f1904e 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -125,6 +125,17 @@ public:
return get_partition_versions(txn, partition_ids, versions,
versionstamps, snapshot_);
}
+ // Get the partition versions from the given partition_ids.
+ // If the partition id is not found, it will be set to 1 in the versions
map.
+ TxnErrorCode get_partition_versions(Transaction* txn, const
std::vector<int64_t>& partition_ids,
+ std::unordered_map<int64_t, int64_t>*
versions,
+ int64_t* last_pending_txn_id, bool
snapshot);
+ TxnErrorCode get_partition_versions(Transaction* txn, const
std::vector<int64_t>& partition_ids,
+ std::unordered_map<int64_t, int64_t>*
versions,
+ int64_t* last_pending_txn_id) {
+ return get_partition_versions(txn, partition_ids, versions,
last_pending_txn_id, snapshot_);
+ }
+
// Get the tablet load stats for the given tablet
//
// If the `tablet_stats` is not nullptr, it will be filled with the
deserialized TabletStatsPB.
@@ -227,6 +238,20 @@ public:
snapshot_);
}
+ // Get the load rowset meta for the given tablet_id and version.
+ TxnErrorCode get_load_rowset_meta(int64_t tablet_id, int64_t version,
+ RowsetMetaCloudPB* rowset_meta, bool
snapshot);
+ TxnErrorCode get_load_rowset_meta(Transaction* txn, int64_t tablet_id,
int64_t version,
+ RowsetMetaCloudPB* rowset_meta, bool
snapshot);
+ TxnErrorCode get_load_rowset_meta(int64_t tablet_id, int64_t version,
+ RowsetMetaCloudPB* rowset_meta) {
+ return get_load_rowset_meta(tablet_id, version, rowset_meta,
snapshot_);
+ }
+ TxnErrorCode get_load_rowset_meta(Transaction* txn, int64_t tablet_id,
int64_t version,
+ RowsetMetaCloudPB* rowset_meta) {
+ return get_load_rowset_meta(txn, tablet_id, version, rowset_meta,
snapshot_);
+ }
+
// Get the tablet meta keys.
TxnErrorCode get_tablet_meta(int64_t tablet_id, TabletMetaCloudPB*
tablet_meta,
Versionstamp* versionstamp, bool snapshot);
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index c30ae9a84bb..926ee3e6bd7 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -390,6 +390,166 @@ TEST(MetaReaderTest, BatchGetPartitionVersion) {
}
}
+TEST(MetaReaderTest, GetPartitionVersionsWithPendingTxn) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string instance_id = "test_instance";
+ std::vector<int64_t> partition_ids = {2101, 2102, 2103, 2104};
+
+ {
+ // Test empty input
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ std::vector<int64_t> empty_ids;
+ std::unordered_map<int64_t, int64_t> versions;
+ int64_t last_pending_txn_id = -1;
+ TxnErrorCode err = meta_reader.get_partition_versions(txn.get(),
empty_ids, &versions,
+
&last_pending_txn_id);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(versions.empty());
+ ASSERT_EQ(last_pending_txn_id, -1);
+ }
+
+ {
+ // Put some partition versions (skip partition_ids[1] to test partial
results)
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ for (size_t i = 0; i < partition_ids.size(); ++i) {
+ if (i == 1) continue; // Skip partition_ids[1]
+ std::string partition_version_key =
+ versioned::partition_version_key({instance_id,
partition_ids[i]});
+ VersionPB version_pb;
+ version_pb.set_version(100 + i * 10); // Different versions: 100,
120, 130
+ // Add pending transaction for partition_ids[2]
+ if (i == 2) {
+ version_pb.add_pending_txn_ids(3001);
+ version_pb.add_pending_txn_ids(3002);
+ }
+ versioned_put(txn.get(), partition_version_key,
version_pb.SerializeAsString());
+ }
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ // Test partial results - partition not found should be set to 1
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ std::unordered_map<int64_t, int64_t> versions;
+ int64_t last_pending_txn_id = -1;
+ TxnErrorCode err = meta_reader.get_partition_versions(txn.get(),
partition_ids, &versions,
+
&last_pending_txn_id);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(versions.size(), 4); // All partition_ids, missing one
should be set to 1
+
+ for (size_t i = 0; i < partition_ids.size(); ++i) {
+ int64_t partition_id = partition_ids[i];
+ ASSERT_NE(versions.find(partition_id), versions.end());
+ if (i == 1) {
+ // Missing partition should be set to 1
+ ASSERT_EQ(versions[partition_id], 1);
+ } else {
+ ASSERT_EQ(versions[partition_id], 100 + i * 10);
+ }
+ }
+ // Should return first pending transaction ID from partition_ids[2]
+ ASSERT_EQ(last_pending_txn_id, 3001);
+ }
+
+ {
+ // Put the missing partition version without pending transaction
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string partition_version_key =
+ versioned::partition_version_key({instance_id,
partition_ids[1]});
+ VersionPB version_pb;
+ version_pb.set_version(110);
+ versioned_put(txn.get(), partition_version_key,
version_pb.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ // Test all keys found with no pending transactions
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ std::unordered_map<int64_t, int64_t> versions;
+ int64_t last_pending_txn_id = -1;
+ TxnErrorCode err = meta_reader.get_partition_versions(txn.get(),
partition_ids, &versions,
+
&last_pending_txn_id);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(versions.size(), partition_ids.size());
+
+ for (size_t i = 0; i < partition_ids.size(); ++i) {
+ int64_t partition_id = partition_ids[i];
+ ASSERT_NE(versions.find(partition_id), versions.end());
+ int32_t expected_version = (i == 1) ? 110 : 100 + i * 10;
+ ASSERT_EQ(versions[partition_id], expected_version);
+ }
+ // Should still return first pending transaction ID from
partition_ids[2]
+ ASSERT_EQ(last_pending_txn_id, 3001);
+ }
+
+ {
+ // Remove pending transactions from partition_ids[2]
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string partition_version_key =
+ versioned::partition_version_key({instance_id,
partition_ids[2]});
+ VersionPB version_pb;
+ version_pb.set_version(120);
+ // No pending transactions this time
+ versioned_put(txn.get(), partition_version_key,
version_pb.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ // Test with no pending transactions
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ std::unordered_map<int64_t, int64_t> versions;
+ int64_t last_pending_txn_id = -1;
+ TxnErrorCode err = meta_reader.get_partition_versions(txn.get(),
partition_ids, &versions,
+
&last_pending_txn_id);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(versions.size(), partition_ids.size());
+
+ for (size_t i = 0; i < partition_ids.size(); ++i) {
+ int64_t partition_id = partition_ids[i];
+ ASSERT_NE(versions.find(partition_id), versions.end());
+ int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 +
i * 10;
+ ASSERT_EQ(versions[partition_id], expected_version);
+ }
+ // No pending transactions, so last_pending_txn_id should remain -1
+ ASSERT_EQ(last_pending_txn_id, -1);
+ }
+
+ {
+ // Test using the convenience method (without snapshot parameter)
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ std::unordered_map<int64_t, int64_t> versions;
+ int64_t last_pending_txn_id = -1;
+ TxnErrorCode err = meta_reader.get_partition_versions(txn.get(),
partition_ids, &versions,
+
&last_pending_txn_id);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(versions.size(), partition_ids.size());
+ // Verify same results as the explicit snapshot=false test
+ for (size_t i = 0; i < partition_ids.size(); ++i) {
+ int64_t partition_id = partition_ids[i];
+ ASSERT_NE(versions.find(partition_id), versions.end());
+ int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 +
i * 10;
+ ASSERT_EQ(versions[partition_id], expected_version);
+ }
+ ASSERT_EQ(last_pending_txn_id, -1);
+ }
+}
+
TEST(MetaReaderTest, GetTabletLoadStats) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
@@ -1300,3 +1460,139 @@ TEST(MetaReaderTest, GetPartitionIndex) {
ASSERT_EQ(partition_index.table_id(), 200);
}
}
+
+TEST(MetaReaderTest, GetLoadRowsetMeta) {
+ using doris::RowsetMetaCloudPB;
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string instance_id = "test_instance";
+ int64_t tablet_id = 5001;
+ int64_t version = 10;
+
+ {
+ // Test key not found when no rowset exists
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+
+ // Create a load rowset
+ RowsetMetaCloudPB expected_rowset_meta;
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string load_key = versioned::meta_rowset_load_key({instance_id,
tablet_id, version});
+ expected_rowset_meta.set_rowset_id(0);
+
expected_rowset_meta.set_rowset_id_v2(fmt::format("test_load_rowset_{}",
version));
+ expected_rowset_meta.set_start_version(version);
+ expected_rowset_meta.set_end_version(version);
+ expected_rowset_meta.set_num_rows(1000);
+ expected_rowset_meta.set_tablet_id(tablet_id);
+
+ ASSERT_TRUE(versioned::document_put(txn.get(), load_key,
std::move(expected_rowset_meta)));
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ // Test successful get with created transaction
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // Verify fields match
+ ASSERT_EQ(rowset_meta.rowset_id_v2(),
expected_rowset_meta.rowset_id_v2());
+ ASSERT_EQ(rowset_meta.start_version(),
expected_rowset_meta.start_version());
+ ASSERT_EQ(rowset_meta.end_version(),
expected_rowset_meta.end_version());
+ ASSERT_EQ(rowset_meta.num_rows(), expected_rowset_meta.num_rows());
+ ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
+ }
+
+ {
+ // Test successful get with provided transaction
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err =
+ meta_reader.get_load_rowset_meta(txn.get(), tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // Verify key fields match
+ ASSERT_EQ(rowset_meta.rowset_id_v2(),
expected_rowset_meta.rowset_id_v2());
+ ASSERT_EQ(rowset_meta.start_version(),
expected_rowset_meta.start_version());
+ ASSERT_EQ(rowset_meta.end_version(),
expected_rowset_meta.end_version());
+ ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
+ }
+
+ // Test with snapshot version functionality
+ Versionstamp snapshot_version;
+ {
+ // Get current snapshot version
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ int64_t version_value = 0;
+ ASSERT_EQ(txn->get_read_version(&version_value), TxnErrorCode::TXN_OK);
+ snapshot_version = Versionstamp(version_value, 1);
+ }
+
+ // Update the rowset
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string load_key = versioned::meta_rowset_load_key({instance_id,
tablet_id, version});
+ RowsetMetaCloudPB updated_rowset_meta = expected_rowset_meta;
+ updated_rowset_meta.set_num_rows(2000); // Update row count
+
+ ASSERT_TRUE(versioned::document_put(txn.get(), load_key,
std::move(updated_rowset_meta)));
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ // Test reading with snapshot version - should get old data
+ MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // Should get original values
+ ASSERT_EQ(rowset_meta.num_rows(), 1000);
+ }
+
+ {
+ // Test reading without snapshot - should get new data
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // Should get updated values
+ ASSERT_EQ(rowset_meta.num_rows(), 2000);
+ }
+
+ {
+ // Test with snapshot flag
+ MetaReader meta_reader(instance_id, txn_kv.get(), true);
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id,
version, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+ // Should get current values since snapshot flag is set but no
snapshot version
+ ASSERT_EQ(rowset_meta.num_rows(), 2000);
+ }
+
+ {
+ // Test getting non-existent version
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ RowsetMetaCloudPB rowset_meta;
+ TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version
+ 1, &rowset_meta);
+ ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+}
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index dd69a69f4bd..70e758a436a 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -1907,9 +1907,8 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
}
}
-static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t
table_id,
- int64_t index_id, int64_t partition_id,
int64_t tablet_id,
- int64_t txn_id) {
+void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t
table_id, int64_t index_id,
+ int64_t partition_id, int64_t tablet_id, int64_t
txn_id) {
create_tablet(meta_service, table_id, index_id, partition_id, tablet_id);
auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id);
CreateRowsetResponse res;
diff --git a/cloud/test/meta_service_versioned_read_test.cpp
b/cloud/test/meta_service_versioned_read_test.cpp
index 1638c7927e1..19d7159aa61 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -63,6 +63,9 @@ extern void insert_rowset(MetaServiceProxy* meta_service,
int64_t db_id, const s
int64_t table_id, int64_t partition_id, int64_t
tablet_id);
extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t
index_id,
int64_t partition_id, int64_t tablet_id);
+extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t
table_id,
+ int64_t index_id, int64_t partition_id,
int64_t tablet_id,
+ int64_t txn_id);
void insert_compact_rowset(Transaction* txn, std::string instance_id, int64_t
tablet_id,
int64_t partition_id, int64_t start_version,
int64_t end_version,
@@ -100,6 +103,315 @@ static void create_and_refresh_instance(MetaServiceProxy*
service, std::string i
});
\
SyncPoint::get_instance()->enable_processing();
+TEST(MetaServiceVersionedReadTest, CommitTxn) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "test_cloud_instance_id";
+ std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t db_id = 666;
+ int64_t table_id = 1234;
+ int64_t index_id = 1235;
+ int64_t partition_id = 1236;
+
+ // case: first version of rowset
+ {
+ int64_t txn_id = -1;
+ // begin txn
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ // mock rowset and tablet
+ int64_t tablet_id_base = 1103;
+ for (int i = 0; i < 5; ++i) {
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id_base + i);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i,
partition_id, -1, 100);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // precommit txn
+ {
+ brpc::Controller cntl;
+ PrecommitTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_precommit_timeout_ms(36000);
+ PrecommitTxnResponse res;
+
meta_service->precommit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // commit txn
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // doubly commit txn
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ auto found = res.status().msg().find(fmt::format(
+ "transaction is already visible: db_id={} txn_id={}",
db_id, txn_id));
+ ASSERT_TRUE(found != std::string::npos);
+ }
+
+ // doubly commit txn(2pc)
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(),
MetaServiceCode::TXN_ALREADY_VISIBLE);
+ auto found = res.status().msg().find(
+ fmt::format("transaction [{}] is already visible, not
pre-committed.", txn_id));
+ ASSERT_TRUE(found != std::string::npos);
+ }
+ }
+
+ {
+ // Get the partition versions
+ brpc::Controller cntl;
+ GetVersionRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.set_partition_id(partition_id);
+ GetVersionResponse res;
+ meta_service->get_version(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.version(), 2);
+ }
+}
+
+TEST(MetaServiceVersionedReadTest, CommitTxnWithSubTxnTest) {
+ auto meta_service = get_meta_service(false);
+ const std::string instance_id = "test_cloud_instance_id";
+ const std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t db_id = 98131;
+ int64_t txn_id = -1;
+ int64_t t1 = 10;
+ int64_t t1_index = 100;
+ int64_t t1_p1 = 11;
+ int64_t t1_p1_t1 = 12;
+ int64_t t1_p1_t2 = 13;
+ int64_t t1_p2 = 14;
+ int64_t t1_p2_t1 = 15;
+ int64_t t2 = 16;
+ int64_t t2_index = 101;
+ int64_t t2_p3 = 17;
+ int64_t t2_p3_t1 = 18;
+ [[maybe_unused]] int64_t t2_p4 = 19;
+ [[maybe_unused]] int64_t t2_p4_t1 = 20;
+ std::string label = "test_label";
+ std::string label2 = "test_label_0";
+
+ // begin txn
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(t1);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ // mock rowset and tablet: for sub_txn1
+ int64_t sub_txn_id1 = txn_id;
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id1);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2,
t1_p2_t1, sub_txn_id1);
+
+ // begin_sub_txn2
+ int64_t sub_txn_id2 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(0);
+ req.set_db_id(db_id);
+ req.set_label(label2);
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id2 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ }
+ // mock rowset and tablet: for sub_txn3
+ create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3,
t2_p3_t1, sub_txn_id2);
+
+ // begin_sub_txn3
+ int64_t sub_txn_id3 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(1);
+ req.set_db_id(db_id);
+ req.set_label("test_label_1");
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ req.mutable_table_ids()->Add(t1);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id3 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+ }
+ // mock rowset and tablet: for sub_txn3
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t1, sub_txn_id3);
+ create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1,
t1_p1_t2, sub_txn_id3);
+
+ // commit txn
+ CommitTxnRequest req;
+ {
+ brpc::Controller cntl;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(666);
+ req.set_txn_id(txn_id);
+ req.set_is_txn_load(true);
+
+ SubTxnInfo sub_txn_info1;
+ sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+ sub_txn_info1.set_table_id(t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);
+
+ SubTxnInfo sub_txn_info2;
+ sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+ sub_txn_info2.set_table_id(t2);
+ sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);
+
+ SubTxnInfo sub_txn_info3;
+ sub_txn_info3.set_sub_txn_id(sub_txn_id3);
+ sub_txn_info3.set_table_id(t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);
+
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3));
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ // std::cout << res.DebugString() << std::endl;
+ ASSERT_EQ(res.table_ids().size(), 3);
+
+ ASSERT_EQ(res.table_ids()[0], t2);
+ ASSERT_EQ(res.partition_ids()[0], t2_p3);
+ ASSERT_EQ(res.versions()[0], 2);
+
+ ASSERT_EQ(res.table_ids()[1], t1);
+ ASSERT_EQ(res.partition_ids()[1], t1_p2);
+ ASSERT_EQ(res.versions()[1], 2);
+
+ ASSERT_EQ(res.table_ids()[2], t1);
+ ASSERT_EQ(res.partition_ids()[2], t1_p1) << res.ShortDebugString();
+ ASSERT_EQ(res.versions()[2], 3) << res.ShortDebugString();
+ }
+
+ // doubly commit txn
+ {
+ brpc::Controller cntl;
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ auto found = res.status().msg().find(
+ fmt::format("transaction is already visible: db_id={}
txn_id={}", db_id, txn_id));
+ ASSERT_TRUE(found != std::string::npos);
+ }
+
+ // Verify the partition versions
+ {
+ brpc::Controller ctrl;
+ GetVersionRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_batch_mode(true);
+ req.add_db_ids(db_id);
+ req.add_db_ids(db_id);
+ req.add_db_ids(db_id);
+ req.add_table_ids(t1);
+ req.add_table_ids(t1);
+ req.add_table_ids(t2);
+ req.add_partition_ids(t1_p1);
+ req.add_partition_ids(t1_p2);
+ req.add_partition_ids(t2_p3);
+
+ GetVersionResponse resp;
+ meta_service->get_version(&ctrl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
+ << " status is " << resp.status().ShortDebugString();
+ ASSERT_EQ(resp.versions().size(), 3);
+ ASSERT_EQ(resp.versions()[0], 3); // t1_p1
+ ASSERT_EQ(resp.versions()[1], 2); // t1_p2
+ ASSERT_EQ(resp.versions()[2], 2); // t2_p3
+ }
+}
+
TEST(MetaServiceVersionedReadTest, GetVersion) {
auto service = get_meta_service(false);
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 325d1146032..0b6b04752d4 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -343,6 +343,21 @@ static void
check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id
ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
}
+// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
+static void create_and_refresh_instance(MetaServiceProxy* service, std::string
instance_id) {
+ // write instance
+ InstanceInfoPB instance_info;
+ instance_info.set_instance_id(instance_id);
+ instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ service->resource_mgr()->refresh_instance(instance_id);
+
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
+}
+
TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) {
auto txn_kv = get_mem_txn_kv();
auto meta_service = get_meta_service(txn_kv, true);
@@ -743,6 +758,114 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
sp->disable_processing();
}
+TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
+ auto txn_kv = get_mem_txn_kv();
+ int64_t db_id = 7651485414;
+ int64_t table_id = 31478952181;
+ int64_t index_id = 89894141;
+ int64_t partition_id = 1241241;
+ bool commit_txn_eventually_finish_hit = false;
+ bool last_pending_txn_id_hit = false;
+ int repair_tablet_idx_count = 0;
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx",
[&](auto&& args) {
+ bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]);
+ ASSERT_FALSE(need_repair_tablet_idx);
+ repair_tablet_idx_count++;
+ });
+
+ sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&&
args) {
+ int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+ ASSERT_EQ(last_pending_txn_id, 0);
+ last_pending_txn_id_hit = true;
+ });
+
+ sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+ MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ commit_txn_eventually_finish_hit = true;
+ });
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, false);
+ std::string instance_id = "test_instance";
+ std::string cloud_unique_id = "1:test_instance:1";
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&&
args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ SyncPoint::get_instance()->enable_processing();
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int txn_id = 0;
+ {
+ // Begin transaction
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label_commit_txn_eventually2");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ // mock rowset and tablet
+ int64_t tablet_id_base = 3131124;
+ for (int i = 0; i < 5; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_GE(repair_tablet_idx_count, 0);
+ ASSERT_TRUE(last_pending_txn_id_hit);
+ ASSERT_TRUE(commit_txn_eventually_finish_hit);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string mock_instance = "test_instance";
+ for (int i = 0; i < 5; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tablet_idx_db_id(txn, db_id, tablet_id);
+ check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 2);
+ }
+ }
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+
TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
auto txn_kv = get_mem_txn_kv();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]