This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 67d8e0c4fe3 [fix](cloud) Fix versioned read DCHECK conditions (#55311)
67d8e0c4fe3 is described below
commit 67d8e0c4fe33d8f45e54619996e57076ffdacaf7
Author: walter <[email protected]>
AuthorDate: Wed Aug 27 10:57:59 2025 +0800
[fix](cloud) Fix versioned read DCHECK conditions (#55311)
---
cloud/src/meta-service/meta_service.cpp | 3 ++-
cloud/src/meta-service/meta_service_txn.cpp | 16 ++++++++++++----
cloud/src/meta-store/txn_kv.cpp | 4 +++-
cloud/src/recycler/recycler_operation_log.cpp | 20 +++++++++++---------
4 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 2ce9fa69df9..76fd82e20c8 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -3946,7 +3946,8 @@ void
MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control
}
} else {
MetaReader reader(instance_id);
- TxnErrorCode err = reader.get_tablet_merged_stats(tablet_id,
&tablet_stat, nullptr);
+ TxnErrorCode err =
+ reader.get_tablet_merged_stats(txn.get(), tablet_id,
&tablet_stat, nullptr);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get versioned tablet stats,
err={}, tablet_id={}", err,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index d5113af1a31..ce128f6d30e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3724,7 +3724,8 @@ void
MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
* @return TxnErrorCode
*/
TxnErrorCode internal_clean_label(std::shared_ptr<TxnKv> txn_kv, const
std::string_view instance_id,
- int64_t db_id, const std::string_view
label_key, KVStats& stats) {
+ int64_t db_id, const std::string_view
label_key, KVStats& stats,
+ bool is_versioned_write) {
std::string label_val;
TxnLabelPB label_pb;
@@ -3813,7 +3814,10 @@ TxnErrorCode internal_clean_label(std::shared_ptr<TxnKv>
txn_kv, const std::stri
continue;
}
- DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_OK);
+ // In versioned write, the recycle key will be write only when the txn
operation log is recycled.
+ if (!is_versioned_write) {
+ DCHECK_EQ(txn->get(recycle_key, &recycle_val),
TxnErrorCode::TXN_OK);
+ }
DCHECK((txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) ||
(txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE));
@@ -3885,6 +3889,8 @@ void
MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
RPC_RATE_LIMIT(clean_txn_label)
const int64_t db_id = request->db_id();
+ bool is_versioned_write = is_version_write_enabled(instance_id);
+
// clean label only by db_id
if (request->labels().empty()) {
std::string begin_label_key = txn_label_key({instance_id, db_id, ""});
@@ -3930,7 +3936,8 @@ void
MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
begin_label_key = k;
LOG(INFO) << "iterator has no more kvs. key=" << hex(k);
}
- err = internal_clean_label(txn_kv_, instance_id, db_id, k,
stats);
+ err = internal_clean_label(txn_kv_, instance_id, db_id, k,
stats,
+ is_versioned_write);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to clean txn label. err={}",
err);
@@ -3943,7 +3950,8 @@ void
MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
} else {
const std::string& label = request->labels(0);
const std::string label_key = txn_label_key({instance_id, db_id,
label});
- TxnErrorCode err = internal_clean_label(txn_kv_, instance_id, db_id,
label_key, stats);
+ TxnErrorCode err = internal_clean_label(txn_kv_, instance_id, db_id,
label_key, stats,
+ is_versioned_write);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to clean txn label. err={}", err);
diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp
index a3b890663ea..1da13d5f6d9 100644
--- a/cloud/src/meta-store/txn_kv.cpp
+++ b/cloud/src/meta-store/txn_kv.cpp
@@ -860,7 +860,9 @@ TxnErrorCode Transaction::batch_scan(
FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetOptions opts)
: opts_(std::move(opts)), begin_(std::move(begin)),
end_(std::move(end)) {
- DCHECK(dynamic_cast<FdbTxnKv*>(opts_.txn_kv.get()));
+ if (opts_.txn_kv) {
+ DCHECK(dynamic_cast<FdbTxnKv*>(opts_.txn_kv.get()));
+ }
DCHECK(!opts_.txn || dynamic_cast<fdb::Transaction*>(opts_.txn)) <<
opts_.txn;
}
diff --git a/cloud/src/recycler/recycler_operation_log.cpp
b/cloud/src/recycler/recycler_operation_log.cpp
index de4e704e21a..fe7476ee308 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -425,24 +425,18 @@ int OperationLogRecycler::commit() {
std::string log_key =
encode_versioned_key(versioned::log_key(instance_id_), log_version_);
// Remove the operation log entry itself after recycling its contents
- LOG_INFO("remove operation log key")
- .tag("instance_id", instance_id_)
- .tag("log_key", hex(log_key))
- .tag("log_version", log_version_.to_string());
+ LOG_INFO("remove operation log key").tag("log_version",
log_version_.to_string());
txn->remove(log_key);
for (const auto& key : keys_to_remove_) {
// Remove versioned keys that were replaced during operation log
processing
- LOG_INFO("remove versioned key").tag("instance_id",
instance_id_).tag("key", hex(key));
+ LOG_INFO("remove versioned key").tag("key", hex(key));
txn->remove(key);
}
for (const auto& [key, value] : kvs_) {
// Put recycled metadata entries (recycle partition, recycle index,
recycle rowset, etc.)
- LOG_INFO("put recycled metadata key")
- .tag("instance_id", instance_id_)
- .tag("key", hex(key))
- .tag("value_size", value.size());
+ LOG_INFO("put recycled metadata key").tag("key",
hex(key)).tag("value_size", value.size());
txn->put(key, value);
}
@@ -548,6 +542,14 @@ int InstanceRecycler::recycle_operation_logs() {
return -1;
}
+ if (!operation_log.has_min_timestamp()) {
+ LOG_WARNING("operation log has not set the min_timestamp")
+ .tag("key", hex(key))
+ .tag("version", versionstamp.version())
+ .tag("order", versionstamp.order())
+ .tag("log", operation_log.ShortDebugString());
+ }
+
bool need_recycle = true; // Always recycle operation logs for now
if (need_recycle) {
AnnotateTag tag("log_key", hex(log_key));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]