zhannngchen commented on code in PR #40204: URL: https://github.com/apache/doris/pull/40204#discussion_r1756707509
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -272,6 +272,11 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { std::shared_lock meta_rlock(_context.tablet->get_header_lock()); specified_rowsets = _context.tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids); } + uint64_t delete_bitmap_count = 0; + { + std::shared_lock l(_context.tablet->tablet_meta()->delete_bitmap().lock); + delete_bitmap_count = _context.tablet->tablet_meta()->delete_bitmap().delete_bitmap.size(); Review Comment: add a `size()` method for DeleteBitmap class ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -2080,6 +2080,168 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl } } +void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, + const RemoveDeleteBitmapRequest* request, + RemoveDeleteBitmapResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(remove_delete_bitmap); + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + if (cloud_unique_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "cloud unique id not set"; + return; + } + + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(remove_delete_bitmap) + // remove delete bitmap of input rowset for MoW table + auto tablet_id = request->tablet_id(); + auto& rowset_ids = request->rowset_ids(); + auto& begin_versions = request->begin_versions(); + auto& end_versions = request->end_versions(); + if (rowset_ids.size() != begin_versions.size() || rowset_ids.size() != end_versions.size()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "rowset and version size not match. " + << " rowset_size=" << rowset_ids.size() + << " begin_version_size=" << begin_versions.size() + << " end_version_size=" << end_versions.size(); + msg = ss.str(); + return; + } + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to init txn"; + return; + } + for (size_t i = 0; i < rowset_ids.size(); i++) { + auto delete_bitmap_start = meta_delete_bitmap_key( + {instance_id, tablet_id, rowset_ids[i], begin_versions[i], 0}); + auto delete_bitmap_end = meta_delete_bitmap_key( + {instance_id, tablet_id, rowset_ids[i], end_versions[i], INT64_MAX}); + txn->remove(delete_bitmap_start, delete_bitmap_end); + } + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "failed to commit job kv, err=" << err; + msg = ss.str(); + return; + } +} + +void MetaServiceImpl::remove_delete_bitmap_update_lock( + google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) { + RPC_PREPROCESS(remove_delete_bitmap_update_lock); + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + if (cloud_unique_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "cloud unique id not set"; + return; + } + + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; + return; + } + RPC_RATE_LIMIT(remove_delete_bitmap_update_lock) + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; + return; + } + auto table_id = request->table_id(); + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + LOG(INFO) << "(" << instance_id << ")" + << "start to remove delete bitmap lock initiator, table_id=" << table_id + << ", key=" << hex(lock_key) << ", initiator=" << request->initiator(); + std::string lock_val; + err = txn->get(lock_key, &lock_val); + if (err != TxnErrorCode::TXN_OK) { Review Comment: how about NOT_FOUND? ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -340,9 +341,125 @@ Status CloudCumulativeCompaction::modify_rowsets() { stats.num_rows(), stats.data_size()); } } + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { Review Comment: ```suggestion if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) { ``` ########## fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java: ########## @@ -782,8 +784,8 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", transactionId, retryTime, e); } - // sleep random millis [20, 200] ms, avoid txn conflict - int randomMillis = 20 + (int) (Math.random() * (200 - 20)); + // sleep random millis [20, 300] ms, avoid txn conflict Review Comment: why changed it to 300? ########## be/src/olap/tablet_meta.cpp: ########## @@ -1166,6 +1168,36 @@ void DeleteBitmap::merge(const DeleteBitmap& other) { } } +void DeleteBitmap::add_to_remove_queue( + const std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>& tuple, + int64_t time_stamp) { + std::shared_lock l(stale_delete_bitmap_lock); + _stale_delete_bitmap.emplace(tuple, time_stamp); +} + +void DeleteBitmap::remove_stale_delete_bitmap_from_queue() { + std::shared_lock l(stale_delete_bitmap_lock); + auto now = UnixMillis(); + auto it = _stale_delete_bitmap.begin(); + while (it != _stale_delete_bitmap.end()) { + if (now - it->second > config::tablet_rowset_stale_sweep_time_sec * 1000) { + auto tablet_id = std::get<0>(it->first); + auto start_bmk = std::get<1>(it->first); + auto end_bmk = std::get<2>(it->first); + remove(start_bmk, end_bmk); + + std::vector<std::string> pre_rowset_ids {}; + pre_rowset_ids.emplace_back(std::get<0>(start_bmk).to_string()); + CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); + auto st = engine.meta_mgr().remove_old_version_delete_bitmap(tablet_id, pre_rowset_ids, Review Comment: Why not request in batch? ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -340,9 +341,125 @@ Status CloudCumulativeCompaction::modify_rowsets() { stats.num_rows(), stats.data_size()); } } + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { + process_old_version_delete_bitmap(); + } return Status::OK(); } +void CloudCumulativeCompaction::process_old_version_delete_bitmap() { + // agg previously rowset old version delete bitmap + std::vector<RowsetSharedPtr> pre_rowsets {}; + std::vector<std::string> pre_rowset_ids {}; + for (const auto& it : cloud_tablet()->rowset_map()) { + if (it.first.second < _input_rowsets.front()->start_version()) { + pre_rowsets.emplace_back(it.second); + pre_rowset_ids.emplace_back(it.second->rowset_id().to_string()); + } + } + std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); + pre_rowsets.erase(pre_rowsets.begin()); + if (!pre_rowsets.empty()) { + auto pre_max_version = _output_rowset->version().second; + DeleteBitmapPtr new_delete_bitmap = + std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id()); + std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>> + to_remove_vec; + for (auto& rowset : pre_rowsets) { + for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { + rowset->rowset_id().to_string(); + DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; + DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; + DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id, + pre_max_version - 1}; + auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( + {rowset->rowset_id(), seg_id, pre_max_version}); + to_remove_vec.emplace_back( + std::make_tuple(_tablet->tablet_id(), start, before_end)); + if (d->isEmpty()) { + continue; + } + new_delete_bitmap->set(end, *d); + } + } + Status get_st; + Status update_st; + Status remove_st; + do { + if (!new_delete_bitmap->empty()) { + // store agg delete bitmap + int64_t initiator = boost::hash_range(_uuid.begin(), _uuid.end()) & + std::numeric_limits<int64_t>::max(); + DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.get_mow_lock_failed", { + get_st = Status::InternalError( + "test fail to get_delete_bitmap_update_lock for " + "tablet_id {}", + cloud_tablet()->tablet_id()); + }); + if (get_st.ok()) { + get_st = _engine.meta_mgr().get_delete_bitmap_update_lock(*cloud_tablet(), -1, Review Comment: Is it need to add some retry? I think it's quite easy to fail to get the lock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org