This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2cc05ce142ff4833d43b9f2c30b43ea4e02e3e47
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Wed Aug 28 11:15:42 2024 +0800

    [Fix](merge-on-write) Fix duplicate key problem after adding sequence 
column for merge-on-write table (#39958)
    
    ## Proposed changes
    Currently, `BaseTablet::lookup_row_key()` use tablet_meta's schema to
    decide whether a tablet has sequence column. But users can use `ALTER
    TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ...` to add hidden
    sequence column on MOW table. This is a light schema change which will
    not change the BE's tablet meta, thus causing wrong behavior in
    `BaseTablet::lookup_row_key()`.
    This PR use the schema of the current load, which is the latest schema,
    to decide whether a tablet has sequence column and correct the lookup
    procedure in `BaseTablet::lookup_row_key()` and
    `Segment::lookup_row_key()`.
    
    branch-2.1-pick: https://github.com/apache/doris/pull/40010
    branch-2.0-pick: https://github.com/apache/doris/pull/40015
---
 be/src/olap/base_tablet.cpp                        | 26 ++++----
 be/src/olap/base_tablet.h                          |  2 +-
 be/src/olap/rowset/segment_v2/segment.cpp          | 30 ++++-----
 be/src/olap/rowset/segment_v2/segment.h            |  4 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  5 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  5 +-
 be/src/service/point_query_executor.cpp            |  6 +-
 .../test_mow_enable_sequence_col.out               | 16 +++++
 .../test_mow_enable_sequence_col.groovy            | 72 ++++++++++++++++++++++
 9 files changed, 130 insertions(+), 36 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index db1e0283854..c2bb9c98e2b 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -475,21 +475,22 @@ Status BaseTablet::lookup_row_data(const Slice& 
encoded_key, const RowLocation&
     return Status::OK();
 }
 
-Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
+Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* 
latest_schema,
+                                  bool with_seq_col,
                                   const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                                   RowLocation* row_location, uint32_t version,
                                   
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
                                   RowsetSharedPtr* rowset, bool with_rowid) {
     SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
     size_t seq_col_length = 0;
-    if (_tablet_meta->tablet_schema()->has_sequence_col() && with_seq_col) {
-        seq_col_length = _tablet_meta->tablet_schema()
-                                 
->column(_tablet_meta->tablet_schema()->sequence_col_idx())
-                                 .length() +
-                         1;
+    // use the latest tablet schema to decide if the tablet has sequence 
column currently
+    const TabletSchema* schema =
+            (latest_schema == nullptr ? _tablet_meta->tablet_schema().get() : 
latest_schema);
+    if (schema->has_sequence_col() && with_seq_col) {
+        seq_col_length = schema->column(schema->sequence_col_idx()).length() + 
1;
     }
     size_t rowid_length = 0;
-    if (with_rowid && 
!_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
+    if (with_rowid && !schema->cluster_key_idxes().empty()) {
         rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
     }
     Slice key_without_seq =
@@ -505,7 +506,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
bool with_seq_col,
         for (int i = num_segments - 1; i >= 0; i--) {
             // If mow table has cluster keys, the key bounds is short keys, 
not primary keys
             // use PrimaryKeyIndexMetaPB in primary key index?
-            if (_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
+            if (schema->cluster_key_idxes().empty()) {
                 if (key_without_seq.compare(segments_key_bounds[i].max_key()) 
> 0 ||
                     key_without_seq.compare(segments_key_bounds[i].min_key()) 
< 0) {
                     continue;
@@ -526,7 +527,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
bool with_seq_col,
         DCHECK_EQ(segments.size(), num_segments);
 
         for (auto id : picked_segments) {
-            Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, 
with_rowid, &loc);
+            Status s = segments[id]->lookup_row_key(encoded_key, schema, 
with_seq_col, with_rowid,
+                                                    &loc);
             if (s.is<KEY_NOT_FOUND>()) {
                 continue;
             }
@@ -537,7 +539,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
bool with_seq_col,
                                   {loc.rowset_id, loc.segment_id, version}, 
loc.row_id)) {
                 // if has sequence col, we continue to compare the sequence_id 
of
                 // all rowsets, util we find an existing key.
-                if (_tablet_meta->tablet_schema()->has_sequence_col()) {
+                if (schema->has_sequence_col()) {
                     continue;
                 }
                 // The key is deleted, we don't need to search for it any more.
@@ -718,8 +720,8 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
             }
 
             RowsetSharedPtr rowset_find;
-            auto st = lookup_row_key(key, true, specified_rowsets, &loc, 
dummy_version.first - 1,
-                                     segment_caches, &rowset_find);
+            auto st = lookup_row_key(key, rowset_schema.get(), true, 
specified_rowsets, &loc,
+                                     dummy_version.first - 1, segment_caches, 
&rowset_find);
             bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || 
st.is<KEY_ALREADY_EXISTS>();
             // It's a defensive DCHECK, we need to exclude some common errors 
to avoid core-dump
             // while stress test
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index d329c786fc9..a781f6d393c 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -144,7 +144,7 @@ public:
     // Lookup the row location of `encoded_key`, the function sets 
`row_location` on success.
     // NOTE: the method only works in unique key model with primary key index, 
you will got a
     //       not supported error in other data model.
-    Status lookup_row_key(const Slice& encoded_key, bool with_seq_col,
+    Status lookup_row_key(const Slice& encoded_key, TabletSchema* 
latest_schema, bool with_seq_col,
                           const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                           RowLocation* row_location, uint32_t version,
                           std::vector<std::unique_ptr<SegmentCacheHandle>>& 
segment_caches,
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 2666fc8b633..68a186205b3 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -745,14 +745,14 @@ Status Segment::new_inverted_index_iterator(const 
TabletColumn& tablet_column,
     return Status::OK();
 }
 
-Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool 
with_rowid,
-                               RowLocation* row_location) {
+Status Segment::lookup_row_key(const Slice& key, const TabletSchema* 
latest_schema,
+                               bool with_seq_col, bool with_rowid, 
RowLocation* row_location) {
     RETURN_IF_ERROR(load_pk_index_and_bf());
-    bool has_seq_col = _tablet_schema->has_sequence_col();
-    bool has_rowid = !_tablet_schema->cluster_key_idxes().empty();
+    bool has_seq_col = latest_schema->has_sequence_col();
+    bool has_rowid = !latest_schema->cluster_key_idxes().empty();
     size_t seq_col_length = 0;
     if (has_seq_col) {
-        seq_col_length = 
_tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
+        seq_col_length = 
latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
     }
     size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;
 
@@ -788,16 +788,20 @@ Status Segment::lookup_row_key(const Slice& key, bool 
with_seq_col, bool with_ro
 
     Slice sought_key = Slice(index_column->get_data_at(0).data, 
index_column->get_data_at(0).size);
 
+    // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." 
to add a hidden sequence column
+    // for a merge-on-write table which doesn't have sequence column, so 
`has_seq_col ==  true` doesn't mean
+    // data in segment has sequence column value
+    bool segment_has_seq_col = _tablet_schema->has_sequence_col();
+    Slice sought_key_without_seq = Slice(
+            sought_key.get_data(),
+            sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) 
- rowid_length);
     if (has_seq_col) {
-        Slice sought_key_without_seq =
-                Slice(sought_key.get_data(), sought_key.get_size() - 
seq_col_length - rowid_length);
-
         // compare key
         if (key_without_seq.compare(sought_key_without_seq) != 0) {
             return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in 
the segment");
         }
 
-        if (with_seq_col) {
+        if (with_seq_col && segment_has_seq_col) {
             // compare sequence id
             Slice sequence_id =
                     Slice(key.get_data() + key_without_seq.get_size() + 1, 
seq_col_length - 1);
@@ -819,11 +823,9 @@ Status Segment::lookup_row_key(const Slice& key, bool 
with_seq_col, bool with_ro
     }
     // found the key, use rowid in pk index if necessary.
     if (has_rowid) {
-        Slice sought_key_without_seq =
-                Slice(sought_key.get_data(), sought_key.get_size() - 
seq_col_length - rowid_length);
-        Slice rowid_slice = Slice(
-                sought_key.get_data() + sought_key_without_seq.get_size() + 
seq_col_length + 1,
-                rowid_length - 1);
+        Slice rowid_slice = Slice(sought_key.get_data() + 
sought_key_without_seq.get_size() +
+                                          (segment_has_seq_col ? 
seq_col_length : 0) + 1,
+                                  rowid_length - 1);
         const auto* type_info = 
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
         const auto* rowid_coder = get_key_coder(type_info->type());
         RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, 
rowid_length,
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index dd61e7eb831..46484ce919c 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -128,8 +128,8 @@ public:
         return _pk_index_reader.get();
     }
 
-    Status lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid,
-                          RowLocation* row_location);
+    Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, 
bool with_seq_col,
+                          bool with_rowid, RowLocation* row_location);
 
     Status read_key_by_rowid(uint32_t row_id, std::string* key);
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 42e625746f3..d543faf8888 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -588,8 +588,9 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
         RowLocation loc;
         // save rowset shared ptr so this rowset wouldn't delete
         RowsetSharedPtr rowset;
-        auto st = _tablet->lookup_row_key(key, have_input_seq_column, 
specified_rowsets, &loc,
-                                          _mow_context->max_version, 
segment_caches, &rowset);
+        auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), 
have_input_seq_column,
+                                          specified_rowsets, &loc, 
_mow_context->max_version,
+                                          segment_caches, &rowset);
         if (st.is<KEY_NOT_FOUND>()) {
             if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
                 ++num_rows_filtered;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index ecb248b7fe9..dd6a97c13d8 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -426,8 +426,9 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
         RowLocation loc;
         // save rowset shared ptr so this rowset wouldn't delete
         RowsetSharedPtr rowset;
-        auto st = _tablet->lookup_row_key(key, have_input_seq_column, 
specified_rowsets, &loc,
-                                          _mow_context->max_version, 
segment_caches, &rowset);
+        auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), 
have_input_seq_column,
+                                          specified_rowsets, &loc, 
_mow_context->max_version,
+                                          segment_caches, &rowset);
         if (st.is<KEY_NOT_FOUND>()) {
             if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
                 ++num_rows_filtered;
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index da79726f389..88293ba9b03 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -383,9 +383,9 @@ Status PointQueryExecutor::_lookup_row_key() {
         }
         // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this 
ptr
         auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
-        st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, false, 
specified_rowsets,
-                                      &location, INT32_MAX /*rethink?*/, 
segment_caches,
-                                      rowset_ptr.get(), false));
+        st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, 
false,
+                                      specified_rowsets, &location, INT32_MAX 
/*rethink?*/,
+                                      segment_caches, rowset_ptr.get(), 
false));
         if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
             continue;
         }
diff --git 
a/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out 
b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out
new file mode 100644
index 00000000000..d99510cfbcb
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+111    aaa     bbb     11
+222    bbb     bbb     11
+333    ccc     ddd     11
+
+-- !sql --
+111    aaa     bbb     11      \N      0       2
+222    bbb     bbb     11      \N      0       3
+333    ccc     ddd     11      \N      0       4
+
+-- !sql --
+111    zzz     yyy     100     99      0       5
+222    xxx     www     400     99      0       8
+333    ccc     ddd     11      \N      0       4
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy 
b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy
new file mode 100644
index 00000000000..2cfb8133fd6
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy
@@ -0,0 +1,72 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mow_enable_sequence_col") {
+
+    def tableName = "test_mow_enable_sequence_col"
+    sql """ DROP TABLE IF EXISTS ${tableName} force;"""
+    sql """CREATE TABLE IF NOT EXISTS ${tableName}
+            (`user_id` BIGINT NOT NULL,
+            `username` VARCHAR(50) NOT NULL,
+            `city` VARCHAR(20),
+            `age` SMALLINT)
+            UNIQUE KEY(`user_id`)
+            DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+            PROPERTIES (
+            "disable_auto_compaction" = true,
+            "replication_allocation" = "tag.location.default: 1",
+            "enable_unique_key_merge_on_write" = "true");"""
+
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) 
VALUES(111,'aaa','bbb',11);"""
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) 
VALUES(222,'bbb','bbb',11);"""
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) 
VALUES(333,'ccc','ddd',11);"""
+    order_qt_sql "select * from ${tableName};"
+
+    sql "set show_hidden_columns = true;"
+    sql "sync;"
+    def res = sql "desc ${tableName} all;"
+    assertTrue(!res.toString().contains("__DORIS_SEQUENCE_COL__"))
+    sql "set show_hidden_columns = false;"
+    sql "sync;"
+
+    def doSchemaChange = { cmd ->
+        sql cmd
+        waitForSchemaChangeDone {
+            sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' 
ORDER BY createtime DESC LIMIT 1"""
+            time 2000
+        }
+    }
+    doSchemaChange """ALTER TABLE ${tableName} ENABLE FEATURE "SEQUENCE_LOAD" 
WITH PROPERTIES ("function_column.sequence_type" = "bigint");"""
+    
+    sql "set show_hidden_columns = true;"
+    sql "sync;"
+    res = sql "desc ${tableName} all;"
+    assertTrue(res.toString().contains("__DORIS_SEQUENCE_COL__"))
+    order_qt_sql "select * from ${tableName};"
+    sql "set show_hidden_columns = false;"
+    sql "sync;"
+
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, 
`__DORIS_SEQUENCE_COL__`) VALUES(111,'zzz','yyy',100,99);"""
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, 
`__DORIS_SEQUENCE_COL__`) VALUES(111,'hhh','mmm',200,88);"""
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, 
`__DORIS_SEQUENCE_COL__`) VALUES(222,'qqq','ppp',300,77);"""
+    sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, 
`__DORIS_SEQUENCE_COL__`) VALUES(222,'xxx','www',400,99);"""
+
+    sql "set show_hidden_columns = true;"
+    sql "sync;"
+    order_qt_sql "select * from ${tableName};"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to