This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new fc715acc1a7 add config variant_max_sparse_column_statistics_size and
fix test (#49632)
fc715acc1a7 is described below
commit fc715acc1a70806b8fc3c1d9a97807418b703940
Author: lihangyu <[email protected]>
AuthorDate: Mon Mar 31 21:52:36 2025 +0800
add config variant_max_sparse_column_statistics_size and fix test (#49632)
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 3 +
be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +-
.../segment_v2/variant_column_writer_impl.cpp | 4 +-
.../rowset/segment_v2/variant_column_writer_impl.h | 8 +-
be/src/vec/columns/column_object.cpp | 38 ++-----
be/src/vec/common/schema_util.cpp | 8 +-
.../variant_column_writer_reader_test.cpp | 10 +-
be/test/vec/common/schema_util_test.cpp | 6 +-
.../test_variant_compaction_with_sparse_limit.out | Bin 0 -> 5787 bytes
regression-test/data/variant_p0/load.out | Bin 16080 -> 16265 bytes
...est_variant_compaction_with_sparse_limit.groovy | 123 +++++++++++++++++++++
regression-test/suites/variant_p0/load.groovy | 2 +-
13 files changed, 157 insertions(+), 49 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b9270703d3e..58c38858a09 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1257,6 +1257,8 @@ DEFINE_Bool(enable_snapshot_action, "false");
DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
+DEFINE_mInt32(variant_max_sparse_column_statistics_size, "10000");
+
DEFINE_mBool(enable_column_type_check, "true");
// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 780b382f103..65728fbb050 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1337,6 +1337,9 @@ DECLARE_Bool(enable_snapshot_action);
// The max columns size for a tablet schema
DECLARE_mInt32(variant_max_merged_tablet_schema_size);
+// The max sparse column statistics size for a variant column
+DECLARE_mInt32(variant_max_sparse_column_statistics_size);
+
DECLARE_mInt64(local_exchange_buffer_mem_limit);
DECLARE_mInt64(enable_debug_log_timeout_secs);
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 2edbb3a1350..881e4edaefa 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -435,7 +435,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator**
iterator, const Tablet
// which means the path maybe exist in sparse_column
bool exceeded_sparse_column_limit =
!_statistics->sparse_column_non_null_size.empty() &&
_statistics->sparse_column_non_null_size.size() ==
-
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE;
+
config::variant_max_sparse_column_statistics_size;
// For compaction operations, read flat leaves, otherwise read
hierarchical data
// Since the variant subcolumns are flattened in
schema_util::get_compaction_schema
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index ae1f2a8de61..1a8e8ed38af 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -222,7 +222,7 @@ Status
VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
paths.emplace(path);
}
// // todo : Add all remaining paths into shared data statistics
until we reach its max size;
- // else if (new_statistics.sparse_data_paths_statistics.size() <
Statistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ // else if (new_statistics.sparse_data_paths_statistics.size() <
Statistics::config::variant_max_sparse_column_statistics_size) {
// new_statistics.sparse_data_paths_statistics.emplace(path,
size);
// }
}
@@ -421,7 +421,7 @@ Status VariantColumnWriterImpl::_process_sparse_column(
it != sparse_data_paths_statistics.end()) {
++it->second;
} else if (sparse_data_paths_statistics.size() <
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ config::variant_max_sparse_column_statistics_size) {
sparse_data_paths_statistics.emplace(path, 1);
}
}
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 9f67cf04505..42beb41bf8a 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -36,13 +36,7 @@ class ColumnWriter;
class ScalarColumnWriter;
struct VariantStatistics {
- // #ifdef BE_TEST
- // static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10;
- // #else
- // // If reached the size of this, we should stop writing statistics
for sparse data
- // static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000;
- // #endif
- static constexpr size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000;
+ // If reached the size of this, we should stop writing statistics for
sparse data
std::map<std::string, size_t> subcolumns_non_null_size;
std::map<std::string, size_t> sparse_column_non_null_size;
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 448ff7df4c1..7947c751246 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -888,32 +888,6 @@ void ColumnObject::check_consistency() const {
"unmatched sparse column:, expeted rows: {},
but meet: {}", num_rows,
serialized_sparse_column->size());
}
-
-#ifndef NDEBUG
- bool error = false;
- auto [path, value] = get_sparse_data_paths_and_values();
-
- auto& offsets = serialized_sparse_column_offsets();
- for (size_t row = 0; row != num_rows; ++row) {
- size_t offset = offsets[row - 1];
- size_t end = offsets[row];
- // Iterator over [path, binary value]
- for (size_t i = offset; i != end; ++i) {
- const StringRef sparse_path_string = path->get_data_at(i);
- const std::string_view sparse_path(sparse_path_string);
-
- const PathInData column_path(sparse_path);
- if (auto* subcolumn = get_subcolumn(column_path); subcolumn !=
nullptr) {
- LOG(WARNING) << "err path: " << sparse_path;
- error = true;
- }
- }
- }
- if (error) {
- throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "path {} both exists in subcolumn and sparse
columns");
- }
-#endif
}
size_t ColumnObject::size() const {
@@ -1810,6 +1784,18 @@ bool ColumnObject::is_visible_root_value(size_t nrow)
const {
if (root->data.is_null_at(nrow)) {
return false;
}
+ for (const auto& subcolumn : subcolumns) {
+ if (subcolumn->data.is_root) {
+ continue; // Skip the root column
+ }
+
+ // If any non-root subcolumn is NOT null, set serialize_root to false
and exit early
+ if (!assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(
+ *subcolumn->data.get_finalized_column_ptr())
+ .is_null_at(nrow)) {
+ return false;
+ }
+ }
if (root->data.least_common_type.get_base_type_id() == TypeIndex::VARIANT)
{
// nested field
return !root->data.is_empty_nested(nrow);
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 6298daffd8f..dd715b5267f 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -716,7 +716,7 @@ Status check_path_stats(const std::vector<RowsetSharedPtr>&
intputs, RowsetShare
// In input rowsets, some rowsets may have statistics values exceeding
the maximum limit,
// which leads to inaccurate statistics
- if (stats.size() > VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE)
{
+ if (stats.size() > config::variant_max_sparse_column_statistics_size) {
// When there is only one segment, we can ensure that the size of
each path in output stats is accurate
if (output->num_segments() == 1) {
for (const auto& [path, size] : stats) {
@@ -841,19 +841,19 @@ void calculate_variant_stats(const IColumn&
encoded_sparse_column,
}
// If path doesn't exist and we haven't hit the max statistics
size limit,
// add it with count 1
- else if (count_map.size() <
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ else if (count_map.size() <
config::variant_max_sparse_column_statistics_size) {
count_map.emplace(sparse_path, 1);
}
}
}
if (stats->sparse_column_non_null_size().size() >
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ config::variant_max_sparse_column_statistics_size) {
throw doris::Exception(
ErrorCode::INTERNAL_ERROR,
"Sparse column non null size: {} is greater than max
statistics size: {}",
stats->sparse_column_non_null_size().size(),
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE);
+ config::variant_max_sparse_column_statistics_size);
}
}
diff --git
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index df8428ac877..476f1136992 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -365,15 +365,15 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_normal) {
// 13. check statistics size == limit
auto& variant_stats = variant_column_reader->_statistics;
EXPECT_TRUE(variant_stats->sparse_column_non_null_size.size() <
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE);
- auto limit = VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE -
+ config::variant_max_sparse_column_statistics_size);
+ auto limit = config::variant_max_sparse_column_statistics_size -
variant_stats->sparse_column_non_null_size.size();
for (int i = 0; i < limit; ++i) {
std::string key = parent_column.name_lower_case() + ".key10" +
std::to_string(i);
variant_stats->sparse_column_non_null_size[key] = 10000;
}
EXPECT_TRUE(variant_stats->sparse_column_non_null_size.size() ==
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE);
+ config::variant_max_sparse_column_statistics_size);
st = variant_column_reader->new_iterator(&it, subcolumn,
&storage_read_opts);
EXPECT_TRUE(st.ok()) << st.msg();
@@ -607,7 +607,7 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_advanced) {
EXPECT_EQ(value, inserted_jsonstr[i]);
}
- auto read_to_column_object = [&]() {
+ auto read_to_column_object = [&]() {
new_column_object = ColumnObject::create(10);
nrows = 1000;
st = it->seek_to_ordinal(0);
@@ -641,7 +641,7 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_advanced) {
for (int row = 0; row < 1000; ++row) {
std::string value;
st = assert_cast<ColumnObject*>(new_column_object.get())
- ->serialize_one_row_to_string(row, &value);
+ ->serialize_one_row_to_string(row, &value);
EXPECT_TRUE(st.ok()) << st.msg();
if (value.find("nested" + key_num) != std::string::npos) {
key_nested_count++;
diff --git a/be/test/vec/common/schema_util_test.cpp
b/be/test/vec/common/schema_util_test.cpp
index ffa790ddf49..7fd7eb6877b 100644
--- a/be/test/vec/common/schema_util_test.cpp
+++ b/be/test/vec/common/schema_util_test.cpp
@@ -208,10 +208,10 @@ TEST_F(SchemaUtilTest, calculate_variant_stats) {
// test with max size
column_map->clear();
const auto& key_value_counts3 = construct_column_map_with_random_values(
- column_map, VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE, 5,
"key2_");
+ column_map, config::variant_max_sparse_column_statistics_size, 5,
"key2_");
schema_util::calculate_variant_stats(*column_map, &stats, 0,
-
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE);
- EXPECT_EQ(VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE,
+
config::variant_max_sparse_column_statistics_size);
+ EXPECT_EQ(config::variant_max_sparse_column_statistics_size,
stats.sparse_column_non_null_size_size());
for (const auto& [path, size] : stats.sparse_column_non_null_size()) {
diff --git
a/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out
b/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out
new file mode 100644
index 00000000000..a5c4281ee98
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/test_variant_compaction_with_sparse_limit.out
differ
diff --git a/regression-test/data/variant_p0/load.out
b/regression-test/data/variant_p0/load.out
index dbdee9d7940..ecbfb38a747 100644
Binary files a/regression-test/data/variant_p0/load.out and
b/regression-test/data/variant_p0/load.out differ
diff --git
a/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy
b/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy
new file mode 100644
index 00000000000..4cc336a2034
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_variant_compaction_with_sparse_limit.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_compaction_variant_with_sparse_limit", "nonConcurrent") {
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+ try {
+ String backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ boolean disableAutoCompaction = true
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ }
+ }
+
+ set_be_config("variant_max_sparse_column_statistics_size", "2")
+ def create_table = { tableName, buckets="auto", key_type="DUPLICATE" ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ def var_def = "variant"
+ if (key_type == "AGGREGATE") {
+ var_def = "variant replace"
+ }
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ k bigint,
+ v ${var_def}
+ )
+ ${key_type} KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS ${buckets}
+ properties("replication_num" = "1", "disable_auto_compaction"
= "true");
+ """
+ }
+
+ def key_types = ["DUPLICATE", "UNIQUE", "AGGREGATE"]
+ // def key_types = ["AGGREGATE"]
+ for (int i = 0; i < key_types.size(); i++) {
+ def tableName = "simple_variant_${key_types[i]}"
+ // 1. simple cases
+ create_table.call(tableName, "1", key_types[i])
+ def insert = {
+ sql """insert into ${tableName} values (1, '{"x" :
[1]}'),(13, '{"a" : 1}');"""
+ sql """insert into ${tableName} values (2, '{"a" :
"1"}'),(14, '{"a" : [[[1]]]}');"""
+ sql """insert into ${tableName} values (3, '{"x" :
[3]}'),(15, '{"a" : 1}')"""
+ sql """insert into ${tableName} values (4, '{"y": 1}'),(16,
'{"a" : "1223"}');"""
+ sql """insert into ${tableName} values (5, '{"z" :
2.0}'),(17, '{"a" : [1]}');"""
+ sql """insert into ${tableName} values (6, '{"x" :
111}'),(18, '{"a" : ["1", 2, 1.1]}');"""
+ sql """insert into ${tableName} values (7, '{"m" : 1}'),(19,
'{"a" : 1, "b" : {"c" : 1}}');"""
+ sql """insert into ${tableName} values (8, '{"l" : 2}'),(20,
'{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${tableName} values (9, '{"g" :
1.11}'),(21, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${tableName} values (10, '{"z" :
1.1111}'),(22, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${tableName} values (11, '{"sala" :
0}'),(1999, '{"a" : 1, "b" : {"c" : 1}}'),(19921, '{"a" : 1, "b" : 10}');"""
+ sql """insert into ${tableName} values (12, '{"dddd" :
0.1}'),(1022, '{"a" : 1, "b" : 10}'),(1029, '{"a" : 1, "b" : {"c" : 1}}');"""
+ }
+ insert.call();
+ insert.call();
+ qt_sql_1 "SELECT * FROM ${tableName} ORDER BY k, cast(v as
string); "
+ qt_sql_2 "select k, cast(v['a'] as array<int>) from ${tableName}
where size(cast(v['a'] as array<int>)) > 0 order by k"
+ qt_sql_3 "select k, v['a'], cast(v['b'] as string) from
${tableName} where length(cast(v['b'] as string)) > 4 order by k"
+ qt_sql_5 "select cast(v['b'] as string), cast(v['b']['c'] as
string) from ${tableName} where cast(v['b'] as string) != 'null' and
cast(v['b'] as string) != '{}' order by k desc, 1, 2 limit 10;"
+
+
+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
+ }
+ // assert (rowCount < 8)
+ qt_sql_11 "SELECT * FROM ${tableName} ORDER BY k, cast(v as
string); "
+ qt_sql_22 "select k, cast(v['a'] as array<int>) from ${tableName}
where size(cast(v['a'] as array<int>)) > 0 order by k"
+ qt_sql_33 "select k, v['a'], cast(v['b'] as string) from
${tableName} where length(cast(v['b'] as string)) > 4 order by k"
+ qt_sql_55 "select cast(v['b'] as string), cast(v['b']['c'] as
string) from ${tableName} where cast(v['b'] as string) != 'null' and
cast(v['b'] as string) != '{}' order by k desc limit 10;"
+ }
+
+ } finally {
+ // set back to default
+ set_be_config("variant_max_sparse_column_statistics_size", "10000")
+ }
+}
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index 267f86e7e57..c2e54dffe3c 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -96,7 +96,7 @@ suite("regression_test_variant", "p0"){
sql """insert into ${table_name} values (11, '[123.1]'),(1999,
'{"a" : 1, "b" : {"c" : 1}}'),(19921, '{"a" : 1, "b" : 10}');"""
sql """insert into ${table_name} values (12, '[123.2]'),(1022,
'{"a" : 1, "b" : 10}'),(1029, '{"a" : 1, "b" : {"c" : 1}}');"""
qt_sql1 "select k, cast(v['a'] as array<int>) from ${table_name}
where size(cast(v['a'] as array<int>)) > 0 order by k, cast(v['a'] as string)
asc"
- qt_sql2 "select k, cast(v as int), cast(v['b'] as string) from
${table_name} where length(cast(v['b'] as string)) > 4 order by k, cast(v as
string), cast(v['b'] as string) "
+ qt_sql2 "select k, cast(v as string), cast(v['b'] as string) from
${table_name} where length(cast(v['b'] as string)) > 4 order by k, cast(v as
string), cast(v['b'] as string) "
qt_sql3 "select k, v from ${table_name} order by k, cast(v as
string) limit 5"
qt_sql4 "select v['b'], v['b']['c'], cast(v as int) from
${table_name} where cast(v['b'] as string) != 'null' and cast(v['b'] as string)
is not null and cast(v['b'] as string) != '{}' order by k,cast(v as string)
desc limit 10000;"
qt_sql5 "select v['b'] from ${table_name} where cast(v['b'] as
int) > 0;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]