mymeiyi commented on code in PR #48024: URL: https://github.com/apache/doris/pull/48024#discussion_r1980789523
########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2215,79 +2284,210 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; return; } - - RPC_RATE_LIMIT(get_delete_bitmap_update_lock) - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to init txn"; - return; - } auto table_id = request->table_id(); std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - DeleteBitmapUpdateLockPB lock_info; - err = txn->get(lock_key, &lock_val); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - ss << "failed to get delete bitmap update lock, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; - msg = ss.str(); - code = MetaServiceCode::KV_TXN_GET_ERR; - return; - } - using namespace std::chrono; - int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); - if (err == TxnErrorCode::TXN_OK) { - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "failed to parse DeleteBitmapUpdateLockPB"; + bool first_retry = true; + while (true) { + response->Clear(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; return; } - if (lock_info.expiration() > 0 && lock_info.expiration() < now) { - LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" - << lock_info.lock_id() << " table_id=" << table_id << " now=" << now; - lock_info.clear_initiators(); - } else if (lock_info.lock_id() != request->lock_id()) { - ss << "already be locked. request lock_id=" << request->lock_id() - << " locked by lock_id=" << lock_info.lock_id() << " table_id=" << table_id - << " now=" << now << " expiration=" << lock_info.expiration(); + std::string lock_val; + DeleteBitmapUpdateLockPB lock_info; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap update lock, instance_id=" << instance_id + << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = MetaServiceCode::LOCK_CONFLICT; + code = MetaServiceCode::KV_TXN_GET_ERR; return; } - } + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + int64_t expiration = now + request->expiration(); + bool lock_key_not_found = false; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + lock_key_not_found = true; + std::string current_lock_msg = "lock key not found"; + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + lock_info.add_initiators(request->initiator()); + } else { + // in normal case, this should remove 0 kvs + // but when upgrade ms, if there are ms with old and new versions, it works + std::string tablet_compaction_key_begin = + mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string tablet_compaction_key_end = + mow_tablet_compaction_key({instance_id, table_id, INT64_MAX}); + txn->remove(tablet_compaction_key_begin, tablet_compaction_key_end); + LOG(INFO) << "remove mow tablet compaction kv, begin=" + << hex(tablet_compaction_key_begin) + << " end=" << hex(tablet_compaction_key_end) << " table_id=" << table_id; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else if (err == TxnErrorCode::TXN_OK) { + if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + return; + } + if (lock_info.lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + if (lock_info.expiration() > 0 && lock_info.expiration() < now) { + LOG(INFO) << "delete bitmap lock expired, continue to process. lock_id=" + << lock_info.lock_id() << " table_id=" << table_id + << " expiration=" << lock_info.expiration() << " now=" << now + << " initiator_size=" << lock_info.initiators_size(); + lock_info.clear_initiators(); + lock_info.clear_expiration(); + } else if (lock_info.lock_id() != request->lock_id()) { + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << lock_info.expiration() << " now=" << now + << ", request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + std::string current_lock_msg = + "original lock_id=" + std::to_string(lock_info.lock_id()); + lock_info.set_lock_id(request->lock_id()); + if (request->lock_id() != COMPACTION_DELETE_BITMAP_LOCK_ID) { + lock_info.set_expiration(expiration); + bool found = false; + for (auto initiator : lock_info.initiators()) { + if (request->initiator() == initiator) { + found = true; + break; + } + } + if (!found) { + lock_info.add_initiators(request->initiator()); + } + } else { + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, request->lock_id(), + request->initiator(), lock_key, lock_info, + current_lock_msg)) { + return; + } + } else { + if (request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID) { + std::string current_lock_msg = "locked by lock_id=-1"; + if (!put_mow_tablet_compaction_key(code, msg, txn, instance_id, table_id, + request->lock_id(), request->initiator(), + expiration, current_lock_msg)) { + return; + } + } else { + // check if compaction key is expired + bool has_unexpired_compaction = false; + int64_t unexpired_expiration = 0; + std::string key0 = mow_tablet_compaction_key({instance_id, table_id, 0}); + std::string key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0}); + MowTabletCompactionPB mow_tablet_compaction; + std::unique_ptr<RangeGetIterator> it; + int64_t expired_compaction_num = 0; + do { + err = txn->get(key0, key1, &it); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "internal error, failed to get mow tablet compaction, err=" + << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next() && !has_unexpired_compaction) { + auto [k, v] = it->next(); + if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size())) + [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse MowTabletCompactionPB"; + return; + } + if (mow_tablet_compaction.expiration() > 0 && + mow_tablet_compaction.expiration() < now) { + LOG(INFO) << "remove mow tablet compaction lock. table_id=" + << table_id << " lock_id=" << lock_info.lock_id() + << " expiration=" << mow_tablet_compaction.expiration() + << " now=" << now << " key=" << hex(k); + txn->remove(k); + expired_compaction_num++; + } else { + has_unexpired_compaction = true; + unexpired_expiration = mow_tablet_compaction.expiration(); + } + } + key0.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more() && !has_unexpired_compaction); + if (has_unexpired_compaction) { + // TODO print initiator + ss << "already be locked by lock_id=" << lock_info.lock_id() + << " expiration=" << unexpired_expiration << " now=" << now + << ". request lock_id=" << request->lock_id() << " table_id=" << table_id + << " initiator=" << request->initiator(); + msg = ss.str(); + code = MetaServiceCode::LOCK_CONFLICT; + return; + } + // all compaction is expired + lock_info.set_lock_id(request->lock_id()); + lock_info.set_expiration(expiration); + lock_info.clear_initiators(); + lock_info.add_initiators(request->initiator()); + std::string current_lock_msg = + std::to_string(expired_compaction_num) + " compaction is expired"; + if (!put_delete_bitmap_update_lock_key(code, msg, txn, table_id, + request->lock_id(), request->initiator(), + lock_key, lock_info, current_lock_msg)) { + return; + } + } + } + } - lock_info.set_lock_id(request->lock_id()); - lock_info.set_expiration(now + request->expiration()); - bool found = false; - for (auto initiator : lock_info.initiators()) { - if (request->initiator() == initiator) { - found = true; + err = txn->commit(); + TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock:commit:conflict", &first_retry, + &err); + if (err == TxnErrorCode::TXN_OK) { break; + } else if (err == TxnErrorCode::TXN_CONFLICT && lock_key_not_found && + request->lock_id() == COMPACTION_DELETE_BITMAP_LOCK_ID && Review Comment: and load fast retry can not get lock because lock is locked -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org