zhannngchen commented on code in PR #40204:
URL: https://github.com/apache/doris/pull/40204#discussion_r1756707509


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -272,6 +272,11 @@ Status 
BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
         std::shared_lock meta_rlock(_context.tablet->get_header_lock());
         specified_rowsets = 
_context.tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
     }
+    uint64_t delete_bitmap_count = 0;
+    {
+        std::shared_lock 
l(_context.tablet->tablet_meta()->delete_bitmap().lock);
+        delete_bitmap_count = 
_context.tablet->tablet_meta()->delete_bitmap().delete_bitmap.size();

Review Comment:
   add a `size()` method for DeleteBitmap class



##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -2080,6 +2080,168 @@ void 
MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
     }
 }
 
+void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* 
controller,
+                                           const RemoveDeleteBitmapRequest* 
request,
+                                           RemoveDeleteBitmapResponse* 
response,
+                                           ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(remove_delete_bitmap);
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    if (cloud_unique_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "cloud unique id not set";
+        return;
+    }
+
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(WARNING) << msg << ", cloud_unique_id=" << 
request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(remove_delete_bitmap)
+    // remove delete bitmap of input rowset for MoW table
+    auto tablet_id = request->tablet_id();
+    auto& rowset_ids = request->rowset_ids();
+    auto& begin_versions = request->begin_versions();
+    auto& end_versions = request->end_versions();
+    if (rowset_ids.size() != begin_versions.size() || rowset_ids.size() != 
end_versions.size()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "rowset and version size not match. "
+           << " rowset_size=" << rowset_ids.size()
+           << " begin_version_size=" << begin_versions.size()
+           << " end_version_size=" << end_versions.size();
+        msg = ss.str();
+        return;
+    }
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to init txn";
+        return;
+    }
+    for (size_t i = 0; i < rowset_ids.size(); i++) {
+        auto delete_bitmap_start = meta_delete_bitmap_key(
+                {instance_id, tablet_id, rowset_ids[i], begin_versions[i], 0});
+        auto delete_bitmap_end = meta_delete_bitmap_key(
+                {instance_id, tablet_id, rowset_ids[i], end_versions[i], 
INT64_MAX});
+        txn->remove(delete_bitmap_start, delete_bitmap_end);
+    }
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit job kv, err=" << err;
+        msg = ss.str();
+        return;
+    }
+}
+
+void MetaServiceImpl::remove_delete_bitmap_update_lock(
+        google::protobuf::RpcController* controller,
+        const RemoveDeleteBitmapUpdateLockRequest* request,
+        RemoveDeleteBitmapUpdateLockResponse* response, 
::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(remove_delete_bitmap_update_lock);
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    if (cloud_unique_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "cloud unique id not set";
+        return;
+    }
+
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
+        return;
+    }
+    RPC_RATE_LIMIT(remove_delete_bitmap_update_lock)
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        msg = "failed to init txn";
+        return;
+    }
+    auto table_id = request->table_id();
+    std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1});
+    LOG(INFO) << "(" << instance_id << ")"
+              << "start to remove delete bitmap lock initiator, table_id=" << 
table_id
+              << ", key=" << hex(lock_key) << ", initiator=" << 
request->initiator();
+    std::string lock_val;
+    err = txn->get(lock_key, &lock_val);
+    if (err != TxnErrorCode::TXN_OK) {

Review Comment:
   how about NOT_FOUND?



##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -340,9 +341,125 @@ Status CloudCumulativeCompaction::modify_rowsets() {
                                                     stats.num_rows(), 
stats.data_size());
         }
     }
+    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+        _tablet->enable_unique_key_merge_on_write()) {

Review Comment:
   
   ```suggestion
       if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
           _tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() 
!= 1) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -782,8 +784,8 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> 
tableToParttions, lo
                     LOG.warn("ignore get delete bitmap lock exception, 
transactionId={}, retryTime={}",
                             transactionId, retryTime, e);
                 }
-                // sleep random millis [20, 200] ms, avoid txn conflict
-                int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+                // sleep random millis [20, 300] ms, avoid txn conflict

Review Comment:
   why changed it to 300?



##########
be/src/olap/tablet_meta.cpp:
##########
@@ -1166,6 +1168,36 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
     }
 }
 
+void DeleteBitmap::add_to_remove_queue(
+        const std::tuple<int64_t, DeleteBitmap::BitmapKey, 
DeleteBitmap::BitmapKey>& tuple,
+        int64_t time_stamp) {
+    std::shared_lock l(stale_delete_bitmap_lock);
+    _stale_delete_bitmap.emplace(tuple, time_stamp);
+}
+
+void DeleteBitmap::remove_stale_delete_bitmap_from_queue() {
+    std::shared_lock l(stale_delete_bitmap_lock);
+    auto now = UnixMillis();
+    auto it = _stale_delete_bitmap.begin();
+    while (it != _stale_delete_bitmap.end()) {
+        if (now - it->second > config::tablet_rowset_stale_sweep_time_sec * 
1000) {
+            auto tablet_id = std::get<0>(it->first);
+            auto start_bmk = std::get<1>(it->first);
+            auto end_bmk = std::get<2>(it->first);
+            remove(start_bmk, end_bmk);
+
+            std::vector<std::string> pre_rowset_ids {};
+            pre_rowset_ids.emplace_back(std::get<0>(start_bmk).to_string());
+            CloudStorageEngine& engine = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
+            auto st = 
engine.meta_mgr().remove_old_version_delete_bitmap(tablet_id, pre_rowset_ids,

Review Comment:
   Why not request in batch?



##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -340,9 +341,125 @@ Status CloudCumulativeCompaction::modify_rowsets() {
                                                     stats.num_rows(), 
stats.data_size());
         }
     }
+    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+        _tablet->enable_unique_key_merge_on_write()) {
+        process_old_version_delete_bitmap();
+    }
     return Status::OK();
 }
 
+void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
+    // agg previously rowset old version delete bitmap
+    std::vector<RowsetSharedPtr> pre_rowsets {};
+    std::vector<std::string> pre_rowset_ids {};
+    for (const auto& it : cloud_tablet()->rowset_map()) {
+        if (it.first.second < _input_rowsets.front()->start_version()) {
+            pre_rowsets.emplace_back(it.second);
+            pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
+        }
+    }
+    std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
+    pre_rowsets.erase(pre_rowsets.begin());
+    if (!pre_rowsets.empty()) {
+        auto pre_max_version = _output_rowset->version().second;
+        DeleteBitmapPtr new_delete_bitmap =
+                
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
+        std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, 
DeleteBitmap::BitmapKey>>
+                to_remove_vec;
+        for (auto& rowset : pre_rowsets) {
+            for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); 
++seg_id) {
+                rowset->rowset_id().to_string();
+                DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
+                DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, 
pre_max_version};
+                DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), 
seg_id,
+                                                    pre_max_version - 1};
+                auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
+                        {rowset->rowset_id(), seg_id, pre_max_version});
+                to_remove_vec.emplace_back(
+                        std::make_tuple(_tablet->tablet_id(), start, 
before_end));
+                if (d->isEmpty()) {
+                    continue;
+                }
+                new_delete_bitmap->set(end, *d);
+            }
+        }
+        Status get_st;
+        Status update_st;
+        Status remove_st;
+        do {
+            if (!new_delete_bitmap->empty()) {
+                // store agg delete bitmap
+                int64_t initiator = boost::hash_range(_uuid.begin(), 
_uuid.end()) &
+                                    std::numeric_limits<int64_t>::max();
+                
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.get_mow_lock_failed", 
{
+                    get_st = Status::InternalError(
+                            "test fail to get_delete_bitmap_update_lock for "
+                            "tablet_id {}",
+                            cloud_tablet()->tablet_id());
+                });
+                if (get_st.ok()) {
+                    get_st = 
_engine.meta_mgr().get_delete_bitmap_update_lock(*cloud_tablet(), -1,

Review Comment:
   Is it need to add some retry? I think it's quite easy to fail to get the 
lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to