This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 485d7db5161 [fix](partial update) Fix missing rowsets during doing
alignment when flushing memtable due to compaction (#28062)
485d7db5161 is described below
commit 485d7db5161efb011b43a10237df1ca83d5e5c3d
Author: bobhan1 <[email protected]>
AuthorDate: Sun Dec 10 12:09:48 2023 +0800
[fix](partial update) Fix missing rowsets during doing alignment when
flushing memtable due to compaction (#28062)
---
be/src/olap/olap_common.h | 9 ++-
be/src/olap/rowset/beta_rowset_writer.cpp | 27 ++++++++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 26 ++++++++
.../rowset/segment_v2/vertical_segment_writer.cpp | 26 ++++++++
.../test_unique_key_mow_rowsets_deleted.out | 19 ++++++
.../test_unique_key_mow_rowsets_deleted.groovy | 75 ++++++++++++++++++++++
.../test_partial_update_publish.groovy | 4 +-
7 files changed, 182 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 1921902a9d0..e63a8aba198 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -493,13 +493,18 @@ inline RowsetId extract_rowset_id(std::string_view
filename) {
class DeleteBitmap;
// merge on write context
struct MowContext {
- MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
+ MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids,
std::shared_ptr<DeleteBitmap> db)
: max_version(version), txn_id(txnid), rowset_ids(ids),
delete_bitmap(db) {}
+ void update_rowset_ids_with_lock(std::function<void()> callback) {
+ std::lock_guard<std::mutex> lock(m);
+ callback();
+ }
int64_t max_version;
int64_t txn_id;
- const RowsetIdUnorderedSet& rowset_ids;
+ RowsetIdUnorderedSet& rowset_ids;
std::shared_ptr<DeleteBitmap> delete_bitmap;
+ std::mutex m; // protection for updating rowset_ids only
};
// used in mow partial update
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 6850ce0f43c..386c59ef2c4 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -182,6 +182,33 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t
segment_id) {
{
std::shared_lock meta_rlock(tablet->get_header_lock());
specified_rowsets =
tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
+
DBUG_EXECUTE_IF("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets",
+ { specified_rowsets.clear(); });
+ if (specified_rowsets.size() !=
_context.mow_context->rowset_ids.size()) {
+ // `get_rowset_by_ids` may fail to find some of the rowsets we
request if cumulative compaction delete
+ // rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for
detials) before we get here.
+ // Becasue we havn't begun calculation for merge-on-write table,
we can safely reset the `_context.mow_context->rowset_ids`
+ // to the latest value and re-request the correspoding rowsets.
+ LOG(INFO) << fmt::format(
+ "[Memtable Flush] some rowsets have been deleted due to "
+ "compaction(specified_rowsets.size()={}, but
rowset_ids.size()={}), reset "
+ "rowset_ids to the latest value. tablet_id: {}, cur
max_version: {}, "
+ "transaction_id: {}",
+ specified_rowsets.size(),
_context.mow_context->rowset_ids.size(),
+ _context.tablet->tablet_id(),
_context.mow_context->max_version,
+ _context.mow_context->txn_id);
+ Status st {Status::OK()};
+ _context.mow_context->update_rowset_ids_with_lock([&]() {
+ _context.mow_context->rowset_ids.clear();
+ st = tablet->all_rs_id(_context.mow_context->max_version,
+ &_context.mow_context->rowset_ids);
+ });
+ if (!st.ok()) {
+ return st;
+ }
+ specified_rowsets =
tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
+ DCHECK(specified_rowsets.size() ==
_context.mow_context->rowset_ids.size());
+ }
}
OlapStopWatch watch;
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset, segments,
specified_rowsets,
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 2b8375da598..ba05dce1664 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -52,6 +52,7 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
+#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
@@ -411,6 +412,31 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
+ { specified_rowsets.clear(); });
+ if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
+ // `get_rowset_by_ids` may fail to find some of the rowsets we
request if cumulative compaction delete
+ // rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for
detials) before we get here.
+ // Becasue we havn't begun calculation for merge-on-write table,
we can safely reset the `_mow_context->rowset_ids`
+ // to the latest value and re-request the correspoding rowsets.
+ LOG(INFO) << fmt::format(
+ "[Memtable Flush] some rowsets have been deleted due to "
+ "compaction(specified_rowsets.size()={}, but
rowset_ids.size()={}), reset "
+ "rowset_ids to the latest value. tablet_id: {}, cur
max_version: {}, "
+ "transaction_id: {}",
+ specified_rowsets.size(), _mow_context->rowset_ids.size(),
tablet->tablet_id(),
+ _mow_context->max_version, _mow_context->txn_id);
+ Status st {Status::OK()};
+ _mow_context->update_rowset_ids_with_lock([&]() {
+ _mow_context->rowset_ids.clear();
+ st = tablet->all_rs_id(_mow_context->max_version,
&_mow_context->rowset_ids);
+ });
+ if (!st.ok()) {
+ return st;
+ }
+ specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+ DCHECK(specified_rowsets.size() ==
_mow_context->rowset_ids.size());
+ }
}
std::vector<std::unique_ptr<SegmentCacheHandle>>
segment_caches(specified_rowsets.size());
// locate rows in base data
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 842eecf3e10..766c3e47a78 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -49,6 +49,7 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
+#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
@@ -344,6 +345,31 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
+ { specified_rowsets.clear(); });
+ if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
+ // `get_rowset_by_ids` may fail to find some of the rowsets we
request if cumulative compaction delete
+ // rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for
detials) before we get here.
+ // Becasue we havn't begun calculation for merge-on-write table,
we can safely reset the `_mow_context->rowset_ids`
+ // to the latest value and re-request the correspoding rowsets.
+ LOG(INFO) << fmt::format(
+ "[Memtable Flush] some rowsets have been deleted due to "
+ "compaction(specified_rowsets.size()={}, but
rowset_ids.size()={}), reset "
+ "rowset_ids to the latest value. tablet_id: {}, cur
max_version: {}, "
+ "transaction_id: {}",
+ specified_rowsets.size(), _mow_context->rowset_ids.size(),
tablet->tablet_id(),
+ _mow_context->max_version, _mow_context->txn_id);
+ Status st {Status::OK()};
+ _mow_context->update_rowset_ids_with_lock([&]() {
+ _mow_context->rowset_ids.clear();
+ st = tablet->all_rs_id(_mow_context->max_version,
&_mow_context->rowset_ids);
+ });
+ if (!st.ok()) {
+ return st;
+ }
+ specified_rowsets =
tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+ DCHECK(specified_rowsets.size() ==
_mow_context->rowset_ids.size());
+ }
}
std::vector<std::unique_ptr<SegmentCacheHandle>>
segment_caches(specified_rowsets.size());
// locate rows in base data
diff --git
a/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out
b/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out
new file mode 100644
index 00000000000..30662e8e0ba
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_unique_key_mow_rowsets_deleted.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1 --
+1 doris 1000 123 1
+2 doris2 2000 223 1
+
+-- !2 --
+1 doris 200 123 1
+2 doris2 400 223 1
+4 yixiu 400 \N 4321
+
+-- !3 --
+1 doris 1000 123 1
+2 doris2 2000 223 1
+
+-- !4 --
+1 doris333 6666 555 4
+2 doris666 9999 888 7
+3 doris222 1111 987 567
+
diff --git
a/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy
b/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy
new file mode 100644
index 00000000000..a1db433331b
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_unique_key_mow_rowsets_deleted.groovy
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_unique_key_mow_rowsets_deleted", "nonConcurrent"){
+
+ def tableName = "test_unique_key_mow_rowsets_deleted1"
+
+ // 1. requested rowsets have been deleted during partial update
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true"); """
+
+ sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1,
"doris", 1000, 123, 1)"""
+ qt_1 """ select * from ${tableName} order by id; """
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
+ sql "set enable_unique_key_partial_update=true;"
+ sql "set enable_insert_strict = false;"
+ sql "sync;"
+ sql """insert into ${tableName}(id,score)
values(2,400),(1,200),(4,400)"""
+ qt_2 """ select * from ${tableName} order by id; """
+ sql "set enable_unique_key_partial_update=false;"
+ sql "set enable_insert_strict = true;"
+ sql "sync;"
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
+ }
+ sql "DROP TABLE IF EXISTS ${tableName};"
+
+
+ // 2. requested rowsets have been deleted during row update
+ tableName = "test_unique_key_mow_rowsets_deleted2"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true"); """
+ sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1,
"doris", 1000, 123, 1)"""
+ qt_3 """ select * from ${tableName} order by id; """
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
+ sql """insert into ${tableName} values(2, "doris666", 9999, 888,
7),(1, "doris333", 6666, 555, 4), (3, "doris222", 1111, 987, 567);"""
+ qt_4 """ select * from ${tableName} order by id; """
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
+ }
+ sql "DROP TABLE IF EXISTS ${tableName};"
+}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy
index 5f83c092a95..0e63b87331c 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy
@@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") {
file '10000.csv'
time 10000 // limit inflight 10s
- }
+ }
streamLoad {
table "${tableName}"
@@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") {
"""
// drop drop
- // sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ DROP TABLE IF EXISTS ${tableName} """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]