zhannngchen commented on code in PR #48024: URL: https://github.com/apache/doris/pull/48024#discussion_r1979008104
########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2196,6 +2222,49 @@ void MetaServiceImpl::get_delete_bitmap(google::protobuf::RpcController* control } } +static bool put_mow_tablet_compaction_key(MetaServiceCode& code, std::string& msg, + std::unique_ptr<Transaction>& txn, + std::string& instance_id, int64_t table_id, + int64_t lock_id, int64_t initiator, int64_t expiration, + std::string& current_lock_msg) { + std::string tablet_compaction_key = + mow_tablet_compaction_key({instance_id, table_id, initiator}); + std::string tablet_compaction_val; + MowTabletCompactionPB mow_tablet_compaction; + mow_tablet_compaction.set_expiration(expiration); + mow_tablet_compaction.SerializeToString(&tablet_compaction_val); + if (tablet_compaction_val.empty()) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + msg = "MowTabletCompactionPB serialization error"; + return false; + } + txn->put(tablet_compaction_key, tablet_compaction_val); + LOG(INFO) << "xxx put tablet compaction key=" << hex(tablet_compaction_key) Review Comment: put mow tablet compaction initiator key ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2215,79 +2284,210 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; return; } - - RPC_RATE_LIMIT(get_delete_bitmap_update_lock) - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to init txn"; - return; - } auto table_id = request->table_id(); std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; - err = txn->get(lock_key, &lock_val); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - ss << "failed to get delete bitmap update lock, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; - msg = ss.str(); - code = MetaServiceCode::KV_TXN_GET_ERR; - return; - } - using namespace std::chrono; - int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); - if (err == TxnErrorCode::TXN_OK) { - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "failed to parse DeleteBitmapUpdateLockPB"; + bool first_retry = true; + while (true) { + response->Clear(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; return; } - if (lock_info.expiration() > 0 && lock_info.expiration() < now) { - LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" - << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; - lock_info.clear_initiators(); - } else if (lock_info.lock_id() != request->lock_id()) { - ss << "already be locked. request lock_id=" << request->lock_id() - << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id - << " now=" << now << " expiration=" << lock_info.expiration(); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap update lock, instance_id=" << instance_id + << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = MetaServiceCode::LOCK_CONFLICT; + code = MetaServiceCode::KV_TXN_GET_ERR; return; } - } + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + int64_t expiration = now + request->expiration(); + bool lock_key_not_found = false; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + lock_key_not_found = true; + std::string current_lock_msg = "lock key not found"; + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + lock_info.add_initiators(request->initiator()); + } else { + // in normal case, this should remove 0 kvs + // but when upgrade ms, if there are ms with old and new versions, it works + std::string tablet_compaction_key_begin = + mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string tablet_compaction_key_end = + mow_tablet_compaction_key({instance_id, table_id, INT64_MAX}); + txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end); + LOG(INFO) << "remove mow tablet compaction kv, begin=" + << hex(tablet_compaction_key_begin) + << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + return; + } + if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + if (lock_info.expiration() > 0 && lock_info.expiration() < now) { + LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" + << lock_info.lock_id() << " table_id=" << table_id + << " expiration=" << lock_info.expiration() << " now=" << now + << " initiator_size=" << lock_info.initiators_size(); + lock_info.clear_initiators(); + lock_info.clear_expiration(); + } else if (lock_info.lock_id() != request->lock_id()) { + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << lock_info.expiration() << " now=" << now + << ", request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + std::string current_lock_msg = + "original lock_id=" + std::to_string(lock_info.lock_id()); + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + bool found = false; + for (auto initiator : lock_info.initiators()) { + if (request->initiator() == initiator) { + found = true; + break; + } + } + if (!found) { + lock_info.add_initiators(request->initiator()); + } + } else { + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, Review Comment: should clear all initiator keys here as well? like L2321 ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2215,79 +2284,210 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; return; } - - RPC_RATE_LIMIT(get_delete_bitmap_update_lock) - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to init txn"; - return; - } auto table_id = request->table_id(); std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; - err = txn->get(lock_key, &lock_val); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - ss << "failed to get delete bitmap update lock, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; - msg = ss.str(); - code = MetaServiceCode::KV_TXN_GET_ERR; - return; - } - using namespace std::chrono; - int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); - if (err == TxnErrorCode::TXN_OK) { - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "failed to parse DeleteBitmapUpdateLockPB"; + bool first_retry = true; + while (true) { + response->Clear(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; return; } - if (lock_info.expiration() > 0 && lock_info.expiration() < now) { - LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" - << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; - lock_info.clear_initiators(); - } else if (lock_info.lock_id() != request->lock_id()) { - ss << "already be locked. request lock_id=" << request->lock_id() - << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id - << " now=" << now << " expiration=" << lock_info.expiration(); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap update lock, instance_id=" << instance_id + << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = MetaServiceCode::LOCK_CONFLICT; + code = MetaServiceCode::KV_TXN_GET_ERR; return; } - } + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + int64_t expiration = now + request->expiration(); + bool lock_key_not_found = false; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + lock_key_not_found = true; + std::string current_lock_msg = "lock key not found"; + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + lock_info.add_initiators(request->initiator()); + } else { + // in normal case, this should remove 0 kvs + // but when upgrade ms, if there are ms with old and new versions, it works + std::string tablet_compaction_key_begin = + mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string tablet_compaction_key_end = + mow_tablet_compaction_key({instance_id, table_id, INT64_MAX}); + txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end); + LOG(INFO) << "remove mow tablet compaction kv, begin=" + << hex(tablet_compaction_key_begin) + << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + return; + } + if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + if (lock_info.expiration() > 0 && lock_info.expiration() < now) { + LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" + << lock_info.lock_id() << " table_id=" << table_id + << " expiration=" << lock_info.expiration() << " now=" << now + << " initiator_size=" << lock_info.initiators_size(); + lock_info.clear_initiators(); + lock_info.clear_expiration(); + } else if (lock_info.lock_id() != request->lock_id()) { + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << lock_info.expiration() << " now=" << now + << ", request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + std::string current_lock_msg = + "original lock_id=" + std::to_string(lock_info.lock_id()); + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + bool found = false; + for (auto initiator : lock_info.initiators()) { + if (request->initiator() == initiator) { + found = true; + break; + } + } + if (!found) { + lock_info.add_initiators(request->initiator()); + } + } else { + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else { + if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) { + std::string current_lock_msg = "locked by lock_id=-1"; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } else { + // check if compaction key is expired + bool has_unexpired_compaction = false; + int64_t unexpired_expiration = 0; + std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0}); Review Comment: we should make sure that `table_id + 1` is greater than `table_id` after encode. e.g. "9" is bigger than "10" in lexicographical order ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2215,79 +2284,210 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; return; } - - RPC_RATE_LIMIT(get_delete_bitmap_update_lock) - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to init txn"; - return; - } auto table_id = request->table_id(); std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; - err = txn->get(lock_key, &lock_val); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - ss << "failed to get delete bitmap update lock, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; - msg = ss.str(); - code = MetaServiceCode::KV_TXN_GET_ERR; - return; - } - using namespace std::chrono; - int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); - if (err == TxnErrorCode::TXN_OK) { - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "failed to parse DeleteBitmapUpdateLockPB"; + bool first_retry = true; + while (true) { + response->Clear(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; return; } - if (lock_info.expiration() > 0 && lock_info.expiration() < now) { - LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" - << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; - lock_info.clear_initiators(); - } else if (lock_info.lock_id() != request->lock_id()) { - ss << "already be locked. request lock_id=" << request->lock_id() - << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id - << " now=" << now << " expiration=" << lock_info.expiration(); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap update lock, instance_id=" << instance_id + << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = MetaServiceCode::LOCK_CONFLICT; + code = MetaServiceCode::KV_TXN_GET_ERR; return; } - } + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + int64_t expiration = now + request->expiration(); + bool lock_key_not_found = false; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + lock_key_not_found = true; + std::string current_lock_msg = "lock key not found"; + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + lock_info.add_initiators(request->initiator()); + } else { + // in normal case, this should remove 0 kvs + // but when upgrade ms, if there are ms with old and new versions, it works + std::string tablet_compaction_key_begin = + mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string tablet_compaction_key_end = + mow_tablet_compaction_key({instance_id, table_id, INT64_MAX}); + txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end); + LOG(INFO) << "remove mow tablet compaction kv, begin=" + << hex(tablet_compaction_key_begin) + << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + return; + } + if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + if (lock_info.expiration() > 0 && lock_info.expiration() < now) { + LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" + << lock_info.lock_id() << " table_id=" << table_id + << " expiration=" << lock_info.expiration() << " now=" << now + << " initiator_size=" << lock_info.initiators_size(); + lock_info.clear_initiators(); + lock_info.clear_expiration(); + } else if (lock_info.lock_id() != request->lock_id()) { + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << lock_info.expiration() << " now=" << now + << ", request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + std::string current_lock_msg = + "original lock_id=" + std::to_string(lock_info.lock_id()); + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + bool found = false; + for (auto initiator : lock_info.initiators()) { + if (request->initiator() == initiator) { + found = true; + break; + } + } + if (!found) { + lock_info.add_initiators(request->initiator()); + } + } else { + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else { + if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) { + std::string current_lock_msg = "locked by lock_id=-1"; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } else { + // check if compaction key is expired + bool has_unexpired_compaction = false; + int64_t unexpired_expiration = 0; + std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0}); + MowTabletCompactionPB mow_tablet_compaction; + std::unique_ptr<RangeGetIterator> it; + int64_t expired_compaction_num = 0; + do { + err = txn->get(key0, key1, &it); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "internal error, failed to get mow tablet compaction, err=" + << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next() && !has_unexpired_compaction) { + auto [k, v] = it->next(); + if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size())) + [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse MowTabletCompactionPB"; + return; + } + if (mow_tablet_compaction.expiration() > 0 && + mow_tablet_compaction.expiration() < now) { + LOG(INFO) << "remove mow tablet compaction lock. table_id=" + << table_id << " lock_id=" << lock_info.lock_id() + << " expiration=" << mow_tablet_compaction.expiration() + << " now=" << now << " key=" << hex(k); + txn->remove(k); + expired_compaction_num++; + } else { + has_unexpired_compaction = true; + unexpired_expiration = mow_tablet_compaction.expiration(); + } + } + key0.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more() && !has_unexpired_compaction); + if (has_unexpired_compaction) { + // TODO print initiator + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << unexpired_expiration << " now=" << now + << ". request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + // all compaction is expired + lock_info.set_lock_id(request->lock_id()); + lock_info.set_expiration(expiration); + lock_info.clear_initiators(); + lock_info.add_initiators(request->initiator()); + std::string current_lock_msg = + std::to_string(expired_compaction_num) + " compaction is expired"; + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, + request->lock_id(), request->initiator(), + lock_key, lock_info, current_lock_msg)) { + return; + } + } + } + } - lock_info.set_lock_id(request->lock_id()); - lock_info.set_expiration(now + request->expiration()); - bool found = false; - for (auto initiator : lock_info.initiators()) { - if (request->initiator() == initiator) { - found = true; + err = txn->commit(); + TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock:commit:conflict", &first_retry, + &err); + if (err == TxnErrorCode::TXN_OK) { break; + } else if (err == TxnErrorCode::TXN_CONFLICT && lock_key_not_found && + request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID && Review Comment: should let load request have a fast retry here, rather than compaction request compaction request should fast retry while releasing lock ########## cloud/src/meta-service/keys.cpp: ########## @@ -497,6 +502,13 @@ std::string system_meta_service_encryption_key_info_key() { // Other keys //============================================================================== +void mow_tablet_compaction_key(const MowTabletCompactionInfo& in, std::string* out) { Review Comment: mow_tablet_compaction_initator_key is better? ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -1777,17 +1779,39 @@ static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, st code = MetaServiceCode::LOCK_EXPIRED; return false; } - bool found = false; - for (auto initiator : lock_info.initiators()) { - if (lock_initiator == initiator) { - found = true; - break; + if (lock_id == COMPACTION_DELETE_BITMAP_LOCK_ID) { + std::string tablet_compaction_key = + mow_tablet_compaction_key({instance_id, table_id, lock_initiator}); + std::string tablet_compaction_val; + err = txn->get(tablet_compaction_key, &tablet_compaction_val); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "tablet compaction key not found, table_id=" << table_id << " lock_id" << lock_id + << " initiator=" << lock_initiator; + msg = ss.str(); + code = MetaServiceCode::LOCK_EXPIRED; + return false; + } + if (err != TxnErrorCode::TXN_OK) { + ss << "failed to get tablet compaction info, err=" << err; + msg = ss.str(); + code = cast_as<ErrCategory::READ>(err); + return false; + } + // TODO does we need check expired time Review Comment: we don't need to check the expire time ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2215,79 +2284,210 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; return; } - - RPC_RATE_LIMIT(get_delete_bitmap_update_lock) - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to init txn"; - return; - } auto table_id = request->table_id(); std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; - err = txn->get(lock_key, &lock_val); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - ss << "failed to get delete bitmap update lock, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; - msg = ss.str(); - code = MetaServiceCode::KV_TXN_GET_ERR; - return; - } - using namespace std::chrono; - int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); - if (err == TxnErrorCode::TXN_OK) { - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "failed to parse DeleteBitmapUpdateLockPB"; + bool first_retry = true; + while (true) { + response->Clear(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; return; } - if (lock_info.expiration() > 0 && lock_info.expiration() < now) { - LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" - << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; - lock_info.clear_initiators(); - } else if (lock_info.lock_id() != request->lock_id()) { - ss << "already be locked. request lock_id=" << request->lock_id() - << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id - << " now=" << now << " expiration=" << lock_info.expiration(); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap update lock, instance_id=" << instance_id + << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = MetaServiceCode::LOCK_CONFLICT; + code = MetaServiceCode::KV_TXN_GET_ERR; return; } - } + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + int64_t expiration = now + request->expiration(); + bool lock_key_not_found = false; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + lock_key_not_found = true; + std::string current_lock_msg = "lock key not found"; + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + lock_info.add_initiators(request->initiator()); + } else { + // in normal case, this should remove 0 kvs + // but when upgrade ms, if there are ms with old and new versions, it works + std::string tablet_compaction_key_begin = + mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string tablet_compaction_key_end = + mow_tablet_compaction_key({instance_id, table_id, INT64_MAX}); + txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end); + LOG(INFO) << "remove mow tablet compaction kv, begin=" + << hex(tablet_compaction_key_begin) + << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + return; + } + if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + if (lock_info.expiration() > 0 && lock_info.expiration() < now) { + LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" + << lock_info.lock_id() << " table_id=" << table_id + << " expiration=" << lock_info.expiration() << " now=" << now + << " initiator_size=" << lock_info.initiators_size(); + lock_info.clear_initiators(); + lock_info.clear_expiration(); + } else if (lock_info.lock_id() != request->lock_id()) { + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << lock_info.expiration() << " now=" << now + << ", request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + std::string current_lock_msg = + "original lock_id=" + std::to_string(lock_info.lock_id()); + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + bool found = false; + for (auto initiator : lock_info.initiators()) { + if (request->initiator() == initiator) { + found = true; + break; + } + } + if (!found) { + lock_info.add_initiators(request->initiator()); + } + } else { + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else { + if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) { + std::string current_lock_msg = "locked by lock_id=-1"; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } else { + // check if compaction key is expired + bool has_unexpired_compaction = false; + int64_t unexpired_expiration = 0; + std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0}); + MowTabletCompactionPB mow_tablet_compaction; + std::unique_ptr<RangeGetIterator> it; + int64_t expired_compaction_num = 0; + do { + err = txn->get(key0, key1, &it); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "internal error, failed to get mow tablet compaction, err=" + << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next() && !has_unexpired_compaction) { + auto [k, v] = it->next(); + if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size())) + [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse MowTabletCompactionPB"; + return; + } + if (mow_tablet_compaction.expiration() > 0 && + mow_tablet_compaction.expiration() < now) { + LOG(INFO) << "remove mow tablet compaction lock. table_id=" + << table_id << " lock_id=" << lock_info.lock_id() + << " expiration=" << mow_tablet_compaction.expiration() + << " now=" << now << " key=" << hex(k); + txn->remove(k); + expired_compaction_num++; + } else { + has_unexpired_compaction = true; + unexpired_expiration = mow_tablet_compaction.expiration(); + } + } + key0.push_back('\x00'); // Update to next smallest key for iteration Review Comment: Is it necessary to update `key0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org