This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9e12a4050 [FIX](agg) fix vertical_compaction_reader for agg table 
with array/map type (#33130)
8d9e12a4050 is described below

commit 8d9e12a405084151fe41ff191f7905a02e844af5
Author: amory <wangqian...@selectdb.com>
AuthorDate: Wed Apr 3 15:20:54 2024 +0800

    [FIX](agg) fix vertical_compaction_reader for agg table with array/map type 
(#33130)
---
 be/src/vec/columns/column.h                        |   3 +
 be/src/vec/columns/column_array.h                  |   1 +
 be/src/vec/columns/column_const.h                  |   2 +
 be/src/vec/columns/column_map.h                    |   1 +
 be/src/vec/columns/column_nullable.h               |   2 +-
 be/src/vec/columns/column_string.h                 |   8 +-
 be/src/vec/olap/block_reader.cpp                   |  17 +--
 be/src/vec/olap/vertical_block_reader.cpp          |  15 +-
 be/src/vec/olap/vertical_block_reader.h            |   2 +-
 .../test_compaction_agg_keys_with_array_map.out    |  13 ++
 .../test_compaction_agg_keys_with_array_map.groovy | 154 +++++++++++++++++++++
 11 files changed, 189 insertions(+), 29 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index ed62354b1bb..0dd08646d8f 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -628,6 +628,9 @@ public:
     /// Implies is_fixed_and_contiguous.
     virtual bool is_numeric() const { return false; }
 
+    // Column is ColumnString/ColumnArray/ColumnMap or other variable length 
column at every row
+    virtual bool is_variable_length() const { return false; }
+
     virtual bool is_column_string() const { return false; }
 
     virtual bool is_column_decimal() const { return false; }
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 24044fc8bce..aa5ded3767c 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -126,6 +126,7 @@ public:
     std::string get_name() const override;
     const char* get_family_name() const override { return "Array"; }
     bool is_column_array() const override { return true; }
+    bool is_variable_length() const override { return true; }
     MutableColumnPtr clone_resized(size_t size) const override;
     size_t size() const override;
     void resize(size_t n) override;
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index cda4ac4ba93..66db2ed54f0 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -105,6 +105,8 @@ public:
         return convert_to_full_column()->convert_to_full_column_if_const();
     }
 
+    bool is_variable_length() const override { return 
data->is_variable_length(); }
+
     ColumnPtr remove_low_cardinality() const;
 
     std::string get_name() const override { return "Const(" + data->get_name() 
+ ")"; }
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 9926a62e54d..593ccd06c9c 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -94,6 +94,7 @@ public:
     ColumnPtr convert_to_full_column_if_const() const override;
 
     MutableColumnPtr clone_resized(size_t size) const override;
+    bool is_variable_length() const override { return true; }
 
     Field operator[](size_t n) const override;
     void get(size_t n, Field& res) const override;
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 8e48bf1fc3f..5b2d9f124e0 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -82,7 +82,7 @@ public:
     }
 
     MutableColumnPtr get_shrinked_column() override;
-
+    bool is_variable_length() const override { return 
nested_column->is_variable_length(); }
     const char* get_family_name() const override { return "Nullable"; }
     std::string get_name() const override { return "Nullable(" + 
nested_column->get_name() + ")"; }
     MutableColumnPtr clone_resized(size_t size) const override;
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 062812315ee..c7393839501 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -106,7 +106,7 @@ private:
 
 public:
     void sanity_check() const;
-
+    bool is_variable_length() const override { return true; }
     const char* get_family_name() const override { return "String"; }
 
     size_t size() const override { return offsets.size(); }
@@ -544,11 +544,17 @@ public:
     }
 
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
+        // we check this column size and self_row because we need to make sure 
when we call
+        // replace_column_data() with a batch column data.
+        // and this column data is cleared at the every beginning.
+        // next we replace column one by one.
         DCHECK(size() > self_row);
         const auto& r = assert_cast<const ColumnString&>(rhs);
         auto data = r.get_data_at(row);
 
         if (!self_row) {
+            // self_row == 0 means we first call replace_column_data() with 
batch column data. so we
+            // should clean last batch column data.
             chars.clear();
             offsets[self_row] = data.size;
         } else {
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index aa8f8861ecb..e2f37fee010 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -193,22 +193,7 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
         _agg_places.push_back(place);
 
         // calculate `_has_variable_length_tag` tag. like string, array, map
-        _stored_has_variable_length_tag[idx] =
-                _stored_data_columns[idx]->is_column_string() ||
-                (_stored_data_columns[idx]->is_nullable() &&
-                 
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
-                         ->get_nested_column_ptr()
-                         ->is_column_string()) ||
-                _stored_data_columns[idx]->is_column_array() ||
-                (_stored_data_columns[idx]->is_nullable() &&
-                 
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
-                         ->get_nested_column_ptr()
-                         ->is_column_array()) ||
-                _stored_data_columns[idx]->is_column_map() ||
-                (_stored_data_columns[idx]->is_nullable() &&
-                 
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
-                         ->get_nested_column_ptr()
-                         ->is_column_map());
+        _stored_has_variable_length_tag[idx] = 
_stored_data_columns[idx]->is_variable_length();
     }
 }
 
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 3fc3d52f9bc..4fa518d58ac 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -181,7 +181,7 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
             
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
 
     _stored_has_null_tag.resize(_stored_data_columns.size());
-    _stored_has_string_tag.resize(_stored_data_columns.size());
+    _stored_has_variable_length_tag.resize(_stored_data_columns.size());
 
     auto& tablet_schema = *_tablet_schema;
     for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
@@ -198,13 +198,8 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
         });
         _agg_places.push_back(place);
 
-        // calculate `has_string` tag.
-        _stored_has_string_tag[idx] =
-                _stored_data_columns[idx]->is_column_string() ||
-                (_stored_data_columns[idx]->is_nullable() &&
-                 
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
-                         ->get_nested_column_ptr()
-                         ->is_column_string());
+        // calculate `_has_variable_length_tag` tag. like string, array, map
+        _stored_has_variable_length_tag[idx] = 
_stored_data_columns[idx]->is_variable_length();
     }
 }
 
@@ -333,8 +328,8 @@ size_t VerticalBlockReader::_copy_agg_data() {
     }
     for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
         auto& dst_column = _stored_data_columns[idx];
-        if (_stored_has_string_tag[idx]) {
-            //string type should replace ordered
+        if (_stored_has_variable_length_tag[idx]) {
+            //variable length type should replace ordered
             for (size_t i = 0; i < copy_size; i++) {
                 auto& ref = _stored_row_ref[i];
                 
dst_column->replace_column_data(*ref.block->get_by_position(idx).column,
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 2c65fd616b1..77a01587b58 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -119,7 +119,7 @@ private:
     std::vector<IteratorRowRef> _stored_row_ref;
 
     std::vector<bool> _stored_has_null_tag;
-    std::vector<bool> _stored_has_string_tag;
+    std::vector<bool> _stored_has_variable_length_tag;
 
     phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> 
_temp_ref_map;
 
diff --git 
a/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out 
b/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out
new file mode 100644
index 00000000000..5c5ee3ed329
--- /dev/null
+++ 
b/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+1      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       ["a", "b", "c"] {"b":1, 
"c":2}
+2      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       ["amory", "doris", 
"2024-04-29"]        {"c":2}
+3      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       \N      \N
+4      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       [null, "sdf"]   \N
+
+-- !select_default2 --
+1      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       ["a", "b", "c"] {"b":1, 
"c":2}
+2      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       ["amory", "doris", 
"2024-04-29"]        {"c":2}
+3      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       \N      \N
+4      2017-10-01      2017-10-01      2017-10-01T11:11:11.110 
2017-10-01T11:11:11.110111      Beijing 10      1       [null, "sdf"]   \N
+
diff --git 
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
new file mode 100644
index 00000000000..1556d2f00a5
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_compaction_agg_keys_with_array_map") {
+    def tableName = "compaction_agg_keys_regression_test_complex"
+
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+        backend_id = backendId_to_backendIP.keySet()[0]
+        
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `user_id` LARGEINT NOT NULL COMMENT "用户id",
+                `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+                `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
+                `city` VARCHAR(20) COMMENT "用户所在城市",
+                `age` SMALLINT COMMENT "用户年龄",
+                `sex` TINYINT COMMENT "用户性别",
+                `array_col` ARRAY<STRING> REPLACE NULL COMMENT "array column",
+                `map_col` MAP<STRING, INT> REPLACE NULL COMMENT "map column")
+            AGGREGATE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, 
`datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
+            PROPERTIES ( "replication_num" = "1" );
+        """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b'], map('a', 1));
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b', 'c'], map('b', 1, 
'c', 2));
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', 'commiter'], 
map('b', 1));
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', 
'2024-04-29'], map('c', 2));
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map());
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map('a', 
1, 'b', 2));
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL);
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', 
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, [NULL, 'sdf'], NULL);
+            """
+
+        qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
+
+        
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+
+        // trigger compactions for all tablets in ${tableName}
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            if (compactJson.status.toLowerCase() == "fail") {
+                assertEquals(disableAutoCompaction, false)
+                logger.info("Compaction was done automatically!")
+            }
+            if (disableAutoCompaction) {
+                assertEquals("success", compactJson.status.toLowerCase())
+            }
+        }
+
+        // wait for all compactions done
+        for (def tablet in tablets) {
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        def replicaNum = get_table_replica_num(tableName)
+        logger.info("get table replica num: " + replicaNum)
+        int rowCount = 0
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                rowCount += Integer.parseInt(rowset.split(" ")[1])
+            }
+        }
+        assert (rowCount < 8 * replicaNum)
+        qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to