This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 8c8f322574d [fix](partial update) Fix missing rowsets during doing alignment when flushing memtable due to compaction #28062 (#28468) 8c8f322574d is described below commit 8c8f322574df58e900d9ccef0d680d723d594368 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Sat Dec 16 19:40:10 2023 +0800 [fix](partial update) Fix missing rowsets during doing alignment when flushing memtable due to compaction #28062 (#28468) --- be/src/olap/memtable.cpp | 26 ++++++++ be/src/olap/olap_common.h | 9 ++- be/src/olap/rowset/segment_v2/segment_writer.cpp | 26 ++++++++ .../test_unique_key_mow_rowsets_deleted.out | 19 ++++++ .../test_unique_key_mow_rowsets_deleted.groovy | 75 ++++++++++++++++++++++ .../test_partial_update_publish.groovy | 4 +- 6 files changed, 155 insertions(+), 4 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 880609b053f..578ee4a7857 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -44,6 +44,7 @@ #include "runtime/load_channel_mgr.h" #include "runtime/thread_context.h" #include "tablet_meta.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -474,6 +475,31 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) { { std::shared_lock meta_rlock(_tablet->get_header_lock()); specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); + DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets", + { specified_rowsets.clear(); }); + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { + // `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete + // rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here. + // Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids` + // to the latest value and re-request the correspoding rowsets. + LOG(INFO) << fmt::format( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset " + "rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, " + "transaction_id: {}", + specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(), + _mow_context->max_version, _mow_context->txn_id); + Status st {Status::OK()}; + _mow_context->update_rowset_ids_with_lock([&]() { + _mow_context->rowset_ids.clear(); + st = _tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids); + }); + if (!st.ok()) { + return st; + } + specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); + DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size()); + } } OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets, diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index e1c5717fcd0..509b9773bf1 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -463,13 +463,18 @@ using RowsetIdUnorderedSet = std::unordered_set<RowsetId, HashOfRowsetId>; class DeleteBitmap; // merge on write context struct MowContext { - MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, + MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids, std::shared_ptr<DeleteBitmap> db) : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} + void update_rowset_ids_with_lock(std::function<void()> callback) { + std::lock_guard<std::mutex> lock(m); + callback(); + } int64_t max_version; int64_t txn_id; - const RowsetIdUnorderedSet& rowset_ids; + RowsetIdUnorderedSet& rowset_ids; std::shared_ptr<DeleteBitmap> delete_bitmap; + std::mutex m; // protection for updating rowset_ids only }; // used in mow partial update diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 7ef33980b5e..e58619ec25b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -51,6 +51,7 @@ #include "service/point_query_executor.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/debug_points.h" #include "util/faststring.h" #include "util/key_util.h" #include "vec/columns/column_nullable.h" @@ -401,6 +402,31 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* { std::shared_lock rlock(_tablet->get_header_lock()); specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); + DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets", + { specified_rowsets.clear(); }); + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { + // `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete + // rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here. + // Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids` + // to the latest value and re-request the correspoding rowsets. + LOG(INFO) << fmt::format( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset " + "rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, " + "transaction_id: {}", + specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(), + _mow_context->max_version, _mow_context->txn_id); + Status st {Status::OK()}; + _mow_context->update_rowset_ids_with_lock([&]() { + _mow_context->rowset_ids.clear(); + st = _tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids); + }); + if (!st.ok()) { + return st; + } + specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); + DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size()); + } } std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); // locate rows in base data diff --git a/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out b/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out new file mode 100644 index 00000000000..30662e8e0ba --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 doris 1000 123 1 +2 doris2 2000 223 1 + +-- !2 -- +1 doris 200 123 1 +2 doris2 400 223 1 +4 yixiu 400 \N 4321 + +-- !3 -- +1 doris 1000 123 1 +2 doris2 2000 223 1 + +-- !4 -- +1 doris333 6666 555 4 +2 doris666 9999 888 7 +3 doris222 1111 987 567 + diff --git a/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy b/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy new file mode 100644 index 00000000000..a1db433331b --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_unique_key_mow_rowsets_deleted", "nonConcurrent"){ + + def tableName = "test_unique_key_mow_rowsets_deleted1" + + // 1. requested rowsets have been deleted during partial update + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ + + sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)""" + qt_1 """ select * from ${tableName} order by id; """ + try { + GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets") + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict = false;" + sql "sync;" + sql """insert into ${tableName}(id,score) values(2,400),(1,200),(4,400)""" + qt_2 """ select * from ${tableName} order by id; """ + sql "set enable_unique_key_partial_update=false;" + sql "set enable_insert_strict = true;" + sql "sync;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets") + } + sql "DROP TABLE IF EXISTS ${tableName};" + + + // 2. requested rowsets have been deleted during row update + tableName = "test_unique_key_mow_rowsets_deleted2" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ + sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)""" + qt_3 """ select * from ${tableName} order by id; """ + try { + GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets") + sql """insert into ${tableName} values(2, "doris666", 9999, 888, 7),(1, "doris333", 6666, 555, 4), (3, "doris222", 1111, 987, 567);""" + qt_4 """ select * from ${tableName} order by id; """ + } finally { + GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets") + } + sql "DROP TABLE IF EXISTS ${tableName};" +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy index 5f83c092a95..0e63b87331c 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy @@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") { file '10000.csv' time 10000 // limit inflight 10s - } + } streamLoad { table "${tableName}" @@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") { """ // drop drop - // sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ DROP TABLE IF EXISTS ${tableName} """ } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org