This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2979e60e34 [Fix](cloud-mow) Check partition's version to avoid
wrongly update visible versions' delete bitmaps (#49710)
a2979e60e34 is described below
commit a2979e60e34c9be602cc8c0a214d6e61dce2f16c
Author: bobhan1 <[email protected]>
AuthorDate: Thu Apr 3 16:56:56 2025 +0800
[Fix](cloud-mow) Check partition's version to avoid wrongly update visible
versions' delete bitmaps (#49710)
### What problem does this PR solve?
considering the following problem:
1. Transaction X acquires the lock and attempts to publish with version
a. This task is sent to the BE. At this point, the tablet's maximum
version is a-1, and task (1) starts computation.
2. Transaction X fails on FE due to timeout and releases the lock.
3. Transaction Y acquires the lock, attempts to publish with version a,
and succeeds.
4. Transaction X retries and acquires the lock again, and attempts to
publish with version b.
5. Meanwhile, task (1) from Transaction X completes its computation on
BE and writes the generated delete bitmap to the MS with version a.
**Since Transaction X currently holds the lock, this write operation
succeeds, overwriting the delete bitmaps written of actual version a by
Transaction Y.**
6. Subsequent transactions on the tablet will use the pending delete
bitmap to delete the version a delete bitmap written by task (1) in the
MS.
The root cause is that when a load txn retries in publish phase, the
locks it gains are different, but they are the same in the current
implementation because they have the same lock_id and initiator.
This PR checks target partition's version when update delete bitmaps to
avoid this problem.
---
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 6 +-
be/src/cloud/cloud_meta_mgr.cpp | 6 +-
be/src/cloud/cloud_meta_mgr.h | 2 +-
be/src/cloud/cloud_tablet.cpp | 16 +-
be/src/cloud/cloud_tablet.h | 7 +-
be/src/olap/base_tablet.cpp | 7 +-
be/src/olap/base_tablet.h | 3 +-
be/src/olap/tablet.cpp | 3 +-
be/src/olap/tablet.h | 4 +-
cloud/src/meta-service/meta_service.cpp | 114 +++++-
cloud/src/meta-service/meta_service_txn.cpp | 1 +
cloud/test/meta_service_test.cpp | 384 ++++++++++++++++++++-
gensrc/proto/cloud.proto | 4 +
13 files changed, 533 insertions(+), 24 deletions(-)
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 06ebf249edb..53638abffc2 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -313,8 +313,10 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
// we still need to update delete bitmap KVs to MS when we skip to
calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may
have been removed and replaced by other txns
int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1;
- RETURN_IF_ERROR(
- tablet->save_delete_bitmap_to_ms(version, transaction_id,
delete_bitmap, lock_id));
+ int64_t next_visible_version =
+ txn_info.is_txn_load ? txn_info.next_visible_version : version;
+ RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(version,
transaction_id, delete_bitmap,
+ lock_id,
next_visible_version));
LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ", publish_status=SUCCEED, not need to re-calculate
delete_bitmaps.";
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 2ab18b9ad9d..839f46ce1cd 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1190,7 +1190,8 @@ Status CloudMetaMgr::update_tablet_schema(int64_t
tablet_id, const TabletSchema&
Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id,
int64_t initiator, DeleteBitmap*
delete_bitmap,
- int64_t txn_id, bool
is_explicit_txn) {
+ int64_t txn_id, bool is_explicit_txn,
+ int64_t next_visible_version) {
VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
@@ -1204,6 +1205,9 @@ Status CloudMetaMgr::update_delete_bitmap(const
CloudTablet& tablet, int64_t loc
if (txn_id > 0) {
req.set_txn_id(txn_id);
}
+ if (next_visible_version > 0) {
+ req.set_next_visible_version(next_visible_version);
+ }
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 1dd09de3705..9f36d24e6d3 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -100,7 +100,7 @@ public:
Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator,
DeleteBitmap* delete_bitmap, int64_t txn_id =
-1,
- bool is_explicit_txn = false);
+ bool is_explicit_txn = false, int64_t
next_visible_version = -1);
Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap*
delete_bitmap);
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 692bf0a84c5..0c538c3ff22 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -699,8 +699,8 @@ CalcDeleteBitmapExecutor*
CloudTablet::calc_delete_bitmap_executor() {
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
- const RowsetIdUnorderedSet&
cur_rowset_ids,
- int64_t lock_id) {
+ const RowsetIdUnorderedSet&
cur_rowset_ids, int64_t lock_id,
+ int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying
again
@@ -716,7 +716,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}
- RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id,
delete_bitmap, lock_id));
+ RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id,
delete_bitmap, lock_id,
+ next_visible_version));
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache
because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when
re-calculating the delete bitmap, during which it will do
@@ -746,7 +747,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t tx
}
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t
txn_id,
- DeleteBitmapPtr delete_bitmap,
int64_t lock_id) {
+ DeleteBitmapPtr delete_bitmap,
int64_t lock_id,
+ int64_t next_visible_version) {
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
@@ -757,11 +759,13 @@ Status CloudTablet::save_delete_bitmap_to_ms(int64_t
cur_version, int64_t txn_id
iter->second);
}
}
- auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
// lock_id != -1 means this is in an explict txn
+ bool is_explicit_txn = (lock_id != -1);
+ auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id;
+
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id,
LOAD_INITIATOR_ID,
new_delete_bitmap.get(), txn_id,
- (lock_id != -1)));
+ is_explicit_txn,
next_visible_version));
return Status::OK();
}
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 03ba47b27a9..bc8bc320aaa 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -178,11 +178,12 @@ public:
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids,
- int64_t lock_id = -1) override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1,
+ int64_t next_visible_version = -1) override;
Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
- DeleteBitmapPtr delete_bitmap, int64_t
lock_id);
+ DeleteBitmapPtr delete_bitmap, int64_t
lock_id,
+ int64_t next_visible_version);
Status calc_delete_bitmap_for_compaction(const
std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr&
output_rowset,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 938f36af423..d5cffe89e2d 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1425,9 +1425,9 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
auto t1 = watch.get_elapse_time_us();
+ int64_t next_visible_version = txn_info->is_txn_load ?
txn_info->next_visible_version
+ :
txn_info->rowset->start_version();
{
- int64_t next_visible_version = txn_info->is_txn_load ?
txn_info->next_visible_version
- :
txn_info->rowset->start_version();
std::shared_lock meta_rlock(self->_meta_lock);
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
if (self->tablet_state() == TABLET_NOTREADY) {
@@ -1572,7 +1572,8 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
auto t5 = watch.get_elapse_time_us();
int64_t lock_id = txn_info->is_txn_load ? txn_info->lock_id : -1;
RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
- transient_rs_writer.get(),
cur_rowset_ids, lock_id));
+ transient_rs_writer.get(),
cur_rowset_ids, lock_id,
+ next_visible_version));
// defensive check, check that the delete bitmap cache we wrote is correct
RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id,
delete_bitmap.get()));
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 7407ea40284..abf89b833b4 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -241,11 +241,10 @@ public:
static Status update_delete_bitmap(const BaseTabletSPtr& self,
TabletTxnInfo* txn_info,
int64_t txn_id, int64_t txn_expiration
= 0,
DeleteBitmapPtr tablet_delete_bitmap =
nullptr);
-
virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet&
cur_rowset_ids,
- int64_t lock_id = -1) = 0;
+ int64_t lock_id = -1, int64_t
next_visible_version = -1) = 0;
virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
void calc_compaction_output_rowset_delete_bitmap(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 108c125fba0..dc8f665ec00 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2535,7 +2535,8 @@ CalcDeleteBitmapExecutor*
Tablet::calc_delete_bitmap_executor() {
Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id) {
+ const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id,
+ int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index afe043bf151..77768cf821d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -419,8 +419,8 @@ public:
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids,
- int64_t lock_id = -1) override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1,
+ int64_t next_visible_version = -1) override;
void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
bool check_all_rowset_segment();
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 03882b17932..5ba8ac617f1 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1849,6 +1849,106 @@ static bool
remove_pending_delete_bitmap(MetaServiceCode& code, std::string& msg
return true;
}
+// When a load txn retries in publish phase with different version to publish,
it will gain delete bitmap lock
+// many times. these locks are *different*, but they are the same in the
current implementation because they have
+// the same lock_id and initiator and don't have version info. If some delete
bitmap calculation task with version X
+// on BE lasts long and try to update delete bitmaps on MS when the txn gains
the lock in later retries
+// with version Y(Y > X) to publish. It may wrongly update version X's delete
bitmaps because the lock don't have version info.
+//
+// This function checks whether the partition version is correct when updating
the delete bitmap
+// to avoid wrongly update an visible version's delete bitmaps.
+// 1. get the db id with txn id
+// 2. get the partition version with db id, table id and partition id
+// 3. check if the partition version matches the updating version
+static bool check_partition_version_when_update_delete_bitmap(
+ MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>&
txn,
+ std::string& instance_id, int64_t table_id, int64_t partition_id,
int64_t tablet_id,
+ int64_t txn_id, int64_t next_visible_version) {
+ if (partition_id <= 0) {
+ LOG(WARNING) << fmt::format(
+ "invalid partition_id, skip to check partition version.
txn={}, "
+ "table_id={}, partition_id={}, tablet_id={}",
+ txn_id, table_id, partition_id, tablet_id);
+ return true;
+ }
+ // Get db id with txn id
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ auto err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id,
err);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ DCHECK(index_pb.has_tablet_index())
+ << fmt::format("txn={}, table_id={}, partition_id={},
tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ DCHECK(index_pb.tablet_index().has_db_id())
+ << fmt::format("txn={}, table_id={}, partition_id={},
tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+ LOG(WARNING) << fmt::format(
+ "has no db_id in TxnIndexPB, skip to check partition version.
txn={}, "
+ "table_id={}, partition_id={}, tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ return true;
+ }
+ int64_t db_id = index_pb.tablet_index().db_id();
+
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition version, txn_id={},
tablet={}, err={}", txn_id,
+ tablet_id, err);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ int64_t cur_max_version {-1};
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ cur_max_version = 1;
+ } else {
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(ver_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse version_pb, txn_id={},
tablet={}, key={}", txn_id,
+ tablet_id, hex(ver_key));
+ LOG(WARNING) << msg;
+ return false;
+ }
+ DCHECK(version_pb.has_version());
+ cur_max_version = version_pb.version();
+
+ if (version_pb.pending_txn_ids_size() > 0) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ cur_max_version += version_pb.pending_txn_ids_size();
+ }
+ }
+
+ if (cur_max_version + 1 != next_visible_version) {
+ code = MetaServiceCode::VERSION_NOT_MATCH;
+ msg = fmt::format(
+ "check version failed when update_delete_bitmap, txn={},
table_id={}, "
+ "partition_id={}, tablet_id={}, found partition's max version
is {}, but "
+ "request next_visible_version is {}",
+ txn_id, table_id, partition_id, tablet_id, cur_max_version,
next_visible_version);
+ return false;
+ }
+ return true;
+}
+
void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController*
controller,
const UpdateDeleteBitmapRequest*
request,
UpdateDeleteBitmapResponse*
response,
@@ -1908,7 +2008,17 @@ void
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}
- // 3. store all pending delete bitmap for this txn
+ // 3. check if partition's version matches
+ if (request->lock_id() > 0 && request->has_txn_id() &&
request->partition_id() &&
+ request->has_next_visible_version()) {
+ if (!check_partition_version_when_update_delete_bitmap(
+ code, msg, txn, instance_id, table_id,
request->partition_id(), tablet_id,
+ request->txn_id(), request->next_visible_version())) {
+ return;
+ }
+ }
+
+ // 4. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id,
request->rowset_ids(i),
@@ -1984,7 +2094,7 @@ void
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}
- // 4. Update delete bitmap for curent txn
+ // 5. Update delete bitmap for curent txn
size_t current_key_count = 0;
size_t current_value_count = 0;
size_t total_key_count = 0;
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 110fbb1d758..d87a6b08fc3 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3131,6 +3131,7 @@ void
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
const std::string index_key = txn_index_key({instance_id, sub_txn_id});
std::string index_val;
TxnIndexPB index_pb;
+ index_pb.mutable_tablet_index()->set_db_id(db_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 6c22b002db3..9bc17c228a2 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -154,6 +154,18 @@ static void create_tablet(MetaServiceProxy* meta_service,
int64_t table_id, int6
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
+static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t
db_id,
+ int64_t table_id, int64_t index_id,
int64_t partition_id,
+ int64_t tablet_id) {
+ brpc::Controller cntl;
+ CreateTabletsRequest req;
+ CreateTabletsResponse res;
+ req.set_db_id(db_id);
+ add_tablet(req, table_id, index_id, partition_id, tablet_id);
+ meta_service->create_tablets(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
+}
+
static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
int64_t table_id, int64_t& txn_id) {
brpc::Controller cntl;
@@ -2050,7 +2062,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
index_key = txn_index_key({mock_instance, sub_txn_id3});
ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK);
txn_index.ParseFromString(index_val);
- ASSERT_FALSE(txn_index.has_tablet_index());
+ ASSERT_TRUE(txn_index.has_tablet_index());
// txn_label
std::string label_key = txn_label_key({mock_instance, db_id, label});
@@ -5147,6 +5159,376 @@ TEST(MetaServiceTest, UpdateDeleteBitmapWithBigKeys) {
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
}
+static void set_partition_version(MetaServiceProxy* meta_service,
std::string_view instance_id,
+ int64_t db_id, int64_t table_id, int64_t
partition_id,
+ int64_t version, std::vector<int64_t>
pending_txn_ids = {}) {
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string ver_val;
+ VersionPB version_pb;
+ version_pb.set_version(version);
+ if (!pending_txn_ids.empty()) {
+ for (auto txn_id : pending_txn_ids) {
+ version_pb.add_pending_txn_ids(txn_id);
+ }
+ }
+ ASSERT_TRUE(version_pb.SerializeToString(&ver_val));
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(ver_key, ver_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+static void begin_txn_and_commit_rowset(MetaServiceProxy* meta_service, const
std::string& label,
+ int64_t db_id, int64_t table_id,
int64_t partition_id,
+ int64_t tablet_id, int64_t* txn_id) {
+ begin_txn(meta_service, db_id, label, table_id, *txn_id);
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(*txn_id, tablet_id, partition_id);
+ prepare_rowset(meta_service, rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ res.Clear();
+ commit_rowset(meta_service, rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+}
+
+static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service,
int64_t table_id,
+ int64_t partition_id, int64_t
lock_id,
+ int64_t initiator) {
+ brpc::Controller cntl;
+ GetDeleteBitmapUpdateLockRequest get_lock_req;
+ GetDeleteBitmapUpdateLockResponse get_lock_res;
+ get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
+ get_lock_req.set_table_id(table_id);
+ get_lock_req.add_partition_ids(partition_id);
+ get_lock_req.set_expiration(5);
+ get_lock_req.set_lock_id(lock_id);
+ get_lock_req.set_initiator(initiator);
+ meta_service->get_delete_bitmap_update_lock(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&get_lock_req,
+ &get_lock_res, nullptr);
+ ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);
+}
+
+static void update_delete_bitmap(MetaServiceProxy* meta_service,
+ UpdateDeleteBitmapRequest&
update_delete_bitmap_req,
+ UpdateDeleteBitmapResponse&
update_delete_bitmap_res,
+ int64_t table_id, int64_t partition_id,
int64_t lock_id,
+ int64_t initiator, int64_t tablet_id, int64_t
txn_id,
+ int64_t next_visible_version, std::string
data = "1111") {
+ brpc::Controller cntl;
+ update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+ update_delete_bitmap_req.set_table_id(table_id);
+ update_delete_bitmap_req.set_partition_id(partition_id);
+ update_delete_bitmap_req.set_lock_id(lock_id);
+ update_delete_bitmap_req.set_initiator(initiator);
+ update_delete_bitmap_req.set_tablet_id(tablet_id);
+ update_delete_bitmap_req.set_txn_id(txn_id);
+ update_delete_bitmap_req.set_next_visible_version(next_visible_version);
+ update_delete_bitmap_req.add_rowset_ids("123");
+ update_delete_bitmap_req.add_segment_ids(0);
+ update_delete_bitmap_req.add_versions(next_visible_version);
+ update_delete_bitmap_req.add_segment_delete_bitmaps(data);
+
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+ &update_delete_bitmap_req,
&update_delete_bitmap_res,
+ nullptr);
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersion) {
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ {
+ // 1. normal path
+ // 1.1 has partition version and request version matches
+ int64_t db_id = 999;
+ int64_t table_id = 1001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 1);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+
+ {
+ // 1. normal path
+ // 1.2 does not have partition version KV and request version matches
+ int64_t db_id = 999;
+ int64_t table_id = 1002;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label12", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id, 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+
+ {
+ // 1. normal path
+ // 1.3 has partition version and pending txn, and request version
matches
+ int64_t db_id = 999;
+ int64_t table_id = 1003;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 120;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label13", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version, {12345});
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersionFail) {
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ {
+ // 2. abnormal path
+ // 2.1 has partition version but request version does not match
+ int64_t db_id = 999;
+ int64_t table_id = 2001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label21", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+
+ {
+ // 2. abnormal path
+ // 2.2 does not have partition version KV and request version does not
match
+ int64_t db_id = 999;
+ int64_t table_id = 2002;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label22", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // first load, wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id, 10);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+
+ {
+ // 2. abnormal path
+ // 2.3 has partition version and pending txn, and request version
matches
+ int64_t db_id = 999;
+ int64_t table_id = 2003;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 120;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label23", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version, {12345});
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 1);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapFailCase) {
+ // simulate the situation described in
https://github.com/apache/doris/pull/49710
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ int64_t db_id = 1999;
+ int64_t table_id = 1001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ set_partition_version(meta_service.get(), instance_id, db_id, table_id,
t1p1, cur_max_version);
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id, index_id,
+ t1p1, tablet_id));
+
+ // txn1 begins
+ int64_t txn_id1;
+ begin_txn_and_commit_rowset(meta_service.get(), "label31", db_id,
table_id, t1p1, tablet_id,
+ &txn_id1);
+ int64_t txn1_version_to_publish = cur_max_version + 1;
+ // txn1 gains the lock and try to publish with version 101
+ int64_t lock_id = txn_id1;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ // txn1 failed due to calculation timeout and removes the delete bitmap
lock
+ RemoveDeleteBitmapUpdateLockRequest remove_req;
+ RemoveDeleteBitmapUpdateLockResponse remove_res;
+ remove_req.set_cloud_unique_id("test_cloud_unique_id");
+ remove_req.set_table_id(table_id);
+ remove_req.set_lock_id(lock_id);
+ remove_req.set_initiator(-1);
+ meta_service->remove_delete_bitmap_update_lock(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&remove_req, &remove_res,
+ nullptr);
+ ASSERT_EQ(remove_res.status().code(), MetaServiceCode::OK);
+
+ // txn2 gains the lock and succeeds to publish with version 101
+ int64_t txn_id2;
+ begin_txn_and_commit_rowset(meta_service.get(), "label32", db_id,
table_id, t1p1, tablet_id,
+ &txn_id2);
+ lock_id = txn_id2;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ int64_t txn2_version_to_publish = cur_max_version + 1;
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ std::string data1 = "1234";
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id2,
+ txn2_version_to_publish, data1);
+
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id2);
+ req.add_mow_table_ids(table_id);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
t1p1});
+ std::string ver_val;
+ VersionPB version_pb;
+ auto ret = txn->get(ver_key, &ver_val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(version_pb.ParseFromString(ver_val));
+ ASSERT_EQ(version_pb.version(), cur_max_version + 1);
+
+ std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id,
table_id, -1});
+ std::string lock_val;
+ ret = txn->get(lock_key, &lock_val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ // txn1 retries to publish and gains the lock, try to publish with version
102
+ lock_id = txn_id1;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ // txn1's previous calculation task finshes and try to update delete
bitmap with version 101
+ std::string data2 = "5678";
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id1,
+ txn1_version_to_publish, data2);
+ // this should fail
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+
+ GetDeleteBitmapRequest get_delete_bitmap_req;
+ GetDeleteBitmapResponse get_delete_bitmap_res;
+ get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+ get_delete_bitmap_req.set_tablet_id(tablet_id);
+ get_delete_bitmap_req.add_rowset_ids("123");
+ get_delete_bitmap_req.add_begin_versions(0);
+ get_delete_bitmap_req.add_end_versions(cur_max_version + 1);
+
meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+ &get_delete_bitmap_req,
&get_delete_bitmap_res, nullptr);
+ ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1);
+}
+
TEST(MetaServiceTest, UpdateDeleteBitmap) {
auto meta_service = get_meta_service();
remove_delete_bitmap_lock(meta_service.get(), 112);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 165528455e1..0bdca64ff6c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1394,6 +1394,7 @@ enum MetaServiceCode {
LOCK_EXPIRED = 8001;
LOCK_CONFLICT = 8002;
ROWSETS_EXPIRED = 8003;
+ VERSION_NOT_MATCH = 8004;
// partial update
ROWSET_META_NOT_FOUND = 9001;
@@ -1422,6 +1423,9 @@ message UpdateDeleteBitmapRequest {
// to determine whether this is in an explicit txn and whether it's the
first sub txn
optional bool is_explicit_txn = 12;
optional int64 txn_id = 13;
+
+ // for load txn only
+ optional int64 next_visible_version = 14;
}
message UpdateDeleteBitmapResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]