This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c93a40bd37 [fix](cloud-mow) compaction may cause duplicate key if get 
delete bitmap from cache failed (#41309)
3c93a40bd37 is described below

commit 3c93a40bd376d0f3fb1e49a9285992f203ebaa8d
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Thu Sep 26 11:46:49 2024 +0800

    [fix](cloud-mow) compaction may cause duplicate key if get delete bitmap 
from cache failed (#41309)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    To accelerate the speed of sync latest delete bitmap, #35856 try to get
    the delete bitmap from `CloudTxnDeleteBitmapCache` first.
    In the following situation, compaction may get empty delete bitmap and
    cause duplicate key:
    1. compaction started
    2. several load succeed during the compaction
    3. compaction finished data merging and start to calculate delete bitmap
    generated by latest load tasks
    4. compaction try to sync rowset and delete bitmap, it get delete bitmap
    first from `CloudTxnDeleteBitmapCache`
    5. `CloudTxnDeleteBitmapCache::get_delete_bitmap()` can get txn infos
    from it's inner map, but cache missed when it try to get delete bitmap
    from LRU cache, it don't report error but returned an empty delete
    bitmap
    6. compaction used wrong delete bitmap, duplicate key occured.
---
 be/src/cloud/cloud_cumulative_compaction.cpp       |  9 ++
 be/src/cloud/cloud_txn_delete_bitmap_cache.cpp     | 28 +++++--
 ...ompaction_get_delete_bitmap_from_cache_fail.out | 22 +++++
 ...action_get_delete_bitmap_from_cache_fail.groovy | 97 ++++++++++++++++++++++
 4 files changed, 150 insertions(+), 6 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index ea5fa7cc340..cb0c4e7f376 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -255,6 +255,15 @@ Status CloudCumulativeCompaction::modify_rowsets() {
     compaction_job->add_txn_id(_output_rowset->txn_id());
     
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
 
+    
DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
+        LOG(INFO) << 
"CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
+        while (DebugPoints::instance()->is_enable(
+                "CloudCumulativeCompaction::modify_rowsets.block")) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(50));
+        }
+        LOG(INFO) << 
"CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
+    });
+
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
     int64_t initiator =
             HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & 
std::numeric_limits<int64_t>::max();
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index 63a21bc0714..55f76316fb7 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -72,9 +72,18 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
         *publish_status = iter->second.publish_status;
         *previous_publish_info = iter->second.publish_info;
     }
-    RETURN_IF_ERROR(
-            get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, 
rowset_ids, nullptr));
-    return Status::OK();
+
+    auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, 
rowset_ids, nullptr);
+
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        // Because of the rowset_ids become empty, all delete bitmap
+        // will be recalculate in CalcDeleteBitmapTask
+        if (delete_bitmap != nullptr) {
+            *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
+        }
+        return Status::OK();
+    }
+    return st;
 }
 
 Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
@@ -95,6 +104,13 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
     CacheKey key(key_str);
     Cache::Handle* handle = lookup(key);
 
+    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", 
{
+        handle = nullptr;
+        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, 
make cache missed "
+                     "when get delete bitmap, txn_id:"
+                  << transaction_id << ", tablet_id: " << tablet_id;
+    });
+
     DeleteBitmapCacheValue* val =
             handle == nullptr ? nullptr : 
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
     if (val) {
@@ -109,9 +125,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
         LOG_INFO("cache missed when get delete bitmap")
                 .tag("txn_id", transaction_id)
                 .tag("tablet_id", tablet_id);
-        // Because of the rowset_ids become empty, all delete bitmap
-        // will be recalculate in CalcDeleteBitmapTask
-        *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
+        return Status::Error<ErrorCode::NOT_FOUND, false>(
+                "cache missed when get delete bitmap, tablet_id={}, 
transaction_id={}", tablet_id,
+                transaction_id);
     }
     return Status::OK();
 }
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out
new file mode 100644
index 00000000000..f3c19536c9b
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out
@@ -0,0 +1,22 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1
+2      2       2
+3      3       3
+4      4       4
+5      5       5
+
+-- !sql --
+1      2       3
+2      3       4
+3      4       5
+4      4       4
+5      5       5
+
+-- !sql --
+1      2       3
+2      3       4
+3      4       5
+4      4       4
+5      5       5
+
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy
new file mode 100644
index 00000000000..72fda5eea26
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail", 
"nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def tableName = 
"test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName}
+            (k int, v1 int, v2 int )
+            UNIQUE KEY(k)
+            DISTRIBUTED BY HASH (k)
+            BUCKETS 1  PROPERTIES(
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write"="true",
+                "disable_auto_compaction" = "true");
+        """
+
+    sql "insert into ${tableName} values(1,1,1);"
+    sql "insert into ${tableName} values(2,2,2);"
+    sql "insert into ${tableName} values(3,3,3);"
+    sql "insert into ${tableName} values(4,4,4);"
+    sql "insert into ${tableName} values(5,5,5);"
+    sql "sync;"
+    order_qt_sql "select * from ${tableName};"
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        def inject_spin_wait = 
'CloudCumulativeCompaction::modify_rowsets.enable_spin_wait'
+        def inject_spin_block = 
'CloudCumulativeCompaction::modify_rowsets.block'
+        def inject_cache_miss = 
'CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss'
+        def injectBe = null
+        def backends = sql_return_maparray('show backends')
+        def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+        def injectBeId = array[0].BackendId
+        def tabletId = array[0].TabletId
+        injectBe = backends.stream().filter(be -> be.BackendId == 
injectBeId).findFirst().orElse(null)
+
+        DebugPoint.enableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_wait)
+        DebugPoint.enableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_block)
+        DebugPoint.enableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, inject_cache_miss)
+        logger.info("run compaction:" + tabletId)
+        (code, out, err) = be_run_cumulative_compaction(injectBe.Host, 
injectBe.HttpPort, tabletId)
+        logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
+
+        // Concurrent inserts
+        sql "insert into ${tableName} values(1,2,3);"
+        sql "insert into ${tableName} values(2,3,4);"
+        sql "insert into ${tableName} values(3,4,5);"
+        sql "sync;"
+        order_qt_sql "set use_fix_replica=0; select * from ${tableName};"
+
+        // let compaction continue
+        DebugPoint.disableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_block)
+
+         do {
+             Thread.sleep(100)
+             (code, out, err) = be_get_compaction_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+             logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+             assertEquals(code, 0)
+             def compactionStatus = parseJson(out.trim())
+             assertEquals("success", compactionStatus.status.toLowerCase())
+             running = compactionStatus.run_status
+         } while (running)
+
+        Thread.sleep(200)
+        order_qt_sql "set use_fix_replica=0; select * from ${tableName};"
+    } catch (Exception e) {
+        logger.info(e.getMessage())
+        assertTrue(false)
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to