This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3c93a40bd37 [fix](cloud-mow) compaction may cause duplicate key if get delete bitmap from cache failed (#41309) 3c93a40bd37 is described below commit 3c93a40bd376d0f3fb1e49a9285992f203ebaa8d Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Thu Sep 26 11:46:49 2024 +0800 [fix](cloud-mow) compaction may cause duplicate key if get delete bitmap from cache failed (#41309) ## Proposed changes Issue Number: close #xxx To accelerate the speed of sync latest delete bitmap, #35856 try to get the delete bitmap from `CloudTxnDeleteBitmapCache` first. In the following situation, compaction may get empty delete bitmap and cause duplicate key: 1. compaction started 2. several load succeed during the compaction 3. compaction finished data merging and start to calculate delete bitmap generated by latest load tasks 4. compaction try to sync rowset and delete bitmap, it get delete bitmap first from `CloudTxnDeleteBitmapCache` 5. `CloudTxnDeleteBitmapCache::get_delete_bitmap()` can get txn infos from it's inner map, but cache missed when it try to get delete bitmap from LRU cache, it don't report error but returned an empty delete bitmap 6. compaction used wrong delete bitmap, duplicate key occured. --- be/src/cloud/cloud_cumulative_compaction.cpp | 9 ++ be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 28 +++++-- ...ompaction_get_delete_bitmap_from_cache_fail.out | 22 +++++ ...action_get_delete_bitmap_from_cache_fail.groovy | 97 ++++++++++++++++++++++ 4 files changed, 150 insertions(+), 6 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index ea5fa7cc340..cb0c4e7f376 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -255,6 +255,15 @@ Status CloudCumulativeCompaction::modify_rowsets() { compaction_job->add_txn_id(_output_rowset->txn_id()); compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string()); + DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", { + LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start"; + while (DebugPoints::instance()->is_enable( + "CloudCumulativeCompaction::modify_rowsets.block")) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit"; + }); + DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max(); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 63a21bc0714..55f76316fb7 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -72,9 +72,18 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( *publish_status = iter->second.publish_status; *previous_publish_info = iter->second.publish_info; } - RETURN_IF_ERROR( - get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr)); - return Status::OK(); + + auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr); + + if (st.is<ErrorCode::NOT_FOUND>()) { + // Because of the rowset_ids become empty, all delete bitmap + // will be recalculate in CalcDeleteBitmapTask + if (delete_bitmap != nullptr) { + *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id); + } + return Status::OK(); + } + return st; } Status CloudTxnDeleteBitmapCache::get_delete_bitmap( @@ -95,6 +104,13 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap( CacheKey key(key_str); Cache::Handle* handle = lookup(key); + DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", { + handle = nullptr; + LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed " + "when get delete bitmap, txn_id:" + << transaction_id << ", tablet_id: " << tablet_id; + }); + DeleteBitmapCacheValue* val = handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle)); if (val) { @@ -109,9 +125,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap( LOG_INFO("cache missed when get delete bitmap") .tag("txn_id", transaction_id) .tag("tablet_id", tablet_id); - // Because of the rowset_ids become empty, all delete bitmap - // will be recalculate in CalcDeleteBitmapTask - *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id); + return Status::Error<ErrorCode::NOT_FOUND, false>( + "cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id, + transaction_id); } return Status::OK(); } diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out new file mode 100644 index 00000000000..f3c19536c9b --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 + +-- !sql -- +1 2 3 +2 3 4 +3 4 5 +4 4 4 +5 5 5 + +-- !sql -- +1 2 3 +2 3 4 +3 4 5 +4 4 4 +5 5 5 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy new file mode 100644 index 00000000000..72fda5eea26 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite("test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def tableName = "test_cloud_mow_compaction_get_delete_bitmap_from_cache_fail" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} + (k int, v1 int, v2 int ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) + BUCKETS 1 PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write"="true", + "disable_auto_compaction" = "true"); + """ + + sql "insert into ${tableName} values(1,1,1);" + sql "insert into ${tableName} values(2,2,2);" + sql "insert into ${tableName} values(3,3,3);" + sql "insert into ${tableName} values(4,4,4);" + sql "insert into ${tableName} values(5,5,5);" + sql "sync;" + order_qt_sql "select * from ${tableName};" + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + def inject_spin_wait = 'CloudCumulativeCompaction::modify_rowsets.enable_spin_wait' + def inject_spin_block = 'CloudCumulativeCompaction::modify_rowsets.block' + def inject_cache_miss = 'CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}") + def injectBeId = array[0].BackendId + def tabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_wait) + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_block) + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, inject_cache_miss) + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // Concurrent inserts + sql "insert into ${tableName} values(1,2,3);" + sql "insert into ${tableName} values(2,3,4);" + sql "insert into ${tableName} values(3,4,5);" + sql "sync;" + order_qt_sql "set use_fix_replica=0; select * from ${tableName};" + + // let compaction continue + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, inject_spin_block) + + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + + Thread.sleep(200) + order_qt_sql "set use_fix_replica=0; select * from ${tableName};" + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org