This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 41b11b844485233658073ca3dfb29b565330300b Author: Sun Chenyang <suncheny...@selectdb.com> AuthorDate: Mon Feb 17 11:08:50 2025 +0800 [improve](variant) Add max count variant sparse (#47541) --- be/src/apache-orc | 2 +- be/src/clucene | 2 +- be/src/common/config.cpp | 1 - be/src/common/config.h | 2 - be/src/olap/compaction.cpp | 6 +- be/src/olap/rowset/segment_v2/column_reader.cpp | 16 +- .../rowset/segment_v2/hierarchical_data_reader.cpp | 17 +- .../rowset/segment_v2/hierarchical_data_reader.h | 5 +- be/src/olap/rowset/segment_v2/segment.cpp | 39 ++- be/src/olap/rowset/segment_v2/segment.h | 17 +- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 8 +- be/src/olap/rowset/segment_v2/segment_iterator.h | 9 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 1 + .../segment_v2/variant_column_writer_impl.cpp | 15 +- .../rowset/segment_v2/variant_column_writer_impl.h | 1 + .../rowset/segment_v2/vertical_segment_writer.cpp | 1 + be/src/olap/tablet_meta.cpp | 3 + be/src/olap/tablet_schema.cpp | 11 +- be/src/olap/tablet_schema.h | 10 +- be/src/runtime/types.cpp | 11 +- be/src/runtime/types.h | 13 +- be/src/vec/columns/column_object.cpp | 56 ++--- be/src/vec/columns/column_object.h | 15 +- be/src/vec/common/schema_util.cpp | 15 +- be/src/vec/core/block.h | 4 +- be/src/vec/data_types/data_type_factory.cpp | 13 +- be/src/vec/data_types/data_type_object.cpp | 29 ++- be/src/vec/data_types/data_type_object.h | 14 +- be/src/vec/data_types/get_least_supertype.cpp | 4 - .../data_types/serde/data_type_object_serde.cpp | 22 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 3 +- .../vec/functions/array/function_array_utils.cpp | 11 +- be/src/vec/functions/function_cast.h | 9 +- be/src/vec/functions/function_variant_element.cpp | 98 +++++++- be/test/vec/columns/column_object_test.cpp | 12 +- .../java/org/apache/doris/catalog/ScalarType.java | 36 ++- .../apache/doris/alter/SchemaChangeHandler.java | 7 + .../java/org/apache/doris/analysis/CastExpr.java | 3 + .../apache/doris/analysis/FunctionCallExpr.java | 3 + .../org/apache/doris/analysis/MVColumnItem.java | 6 + .../main/java/org/apache/doris/catalog/Column.java | 7 + .../main/java/org/apache/doris/catalog/Env.java | 6 + .../java/org/apache/doris/catalog/OlapTable.java | 16 ++ .../org/apache/doris/catalog/TableProperty.java | 10 + .../apache/doris/common/util/PropertyAnalyzer.java | 22 ++ .../apache/doris/datasource/InternalCatalog.java | 15 ++ .../rules/rewrite/VariantSubPathPruning.java | 2 +- .../expressions/functions/ComputeSignature.java | 1 + .../functions/ComputeSignatureHelper.java | 29 +++ .../functions/generator/ExplodeVariantArray.java | 2 +- .../expressions/functions/scalar/ElementAt.java | 4 +- .../org/apache/doris/nereids/types/DataType.java | 5 +- .../apache/doris/nereids/types/VariantType.java | 20 +- .../java/org/apache/doris/qe/SessionVariable.java | 23 ++ .../doris/statistics/util/StatisticsUtil.java | 3 +- .../dialect/trino/TrinoLogicalPlanBuilder.java | 2 + gensrc/proto/data.proto | 1 + gensrc/proto/olap_file.proto | 1 + gensrc/proto/segment_v2.proto | 1 + gensrc/proto/types.proto | 3 + gensrc/thrift/Types.thrift | 6 +- regression-test/data/variant_p0/delete_update.out | Bin 921 -> 921 bytes .../data/variant_p0/test_sub_path_pruning.out | Bin 5835 -> 5871 bytes regression-test/data/variant_p0/update/load.out | Bin 0 -> 251 bytes regression-test/data/variant_p0/update/query.out | Bin 0 -> 3183 bytes .../compaction/test_compaction_extract_root.out | Bin 243 -> 266 bytes .../suites/variant_p0/delete_update.groovy | 12 +- .../suites/variant_p0/element_function.groovy | 3 +- regression-test/suites/variant_p0/load.groovy | 1 + .../suites/variant_p0/select_partition.groovy | 1 + .../suites/variant_p0/test_sub_path_pruning.groovy | 12 +- .../suites/variant_p0/update/load.groovy | 123 ++++++++++ .../suites/variant_p0/update/query.groovy | 262 +++++++++++++++++++++ .../compaction/compaction_sparse_column.groovy | 8 +- regression-test/suites/variant_p2/load.groovy | 2 +- 75 files changed, 942 insertions(+), 211 deletions(-) diff --git a/be/src/apache-orc b/be/src/apache-orc index db01184f765..2f937bdc764 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit db01184f765c03496e4107bd3ac37c077ac4bc5f +Subproject commit 2f937bdc76406f150b484b6e57629aa8a03d48b6 diff --git a/be/src/clucene b/be/src/clucene index 48fa9cc4ec3..2204eaec46a 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 48fa9cc4ec32b40bf3b02338d0a1b2cdbc6408cf +Subproject commit 2204eaec46a68e5e9a1876b7021f24839ecb2cf0 diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3c7b2d1d756..bf543837118 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1020,7 +1020,6 @@ DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000"); // Whether use schema dict in backend side instead of MetaService side(cloud mode) DEFINE_mBool(variant_use_cloud_schema_dict, "true"); DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false"); -DEFINE_mInt32(variant_max_subcolumns_count, "5"); // block file cache DEFINE_Bool(enable_file_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 419a9439747..32fab21c78b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1221,8 +1221,6 @@ DECLARE_mBool(variant_use_cloud_schema_dict); // Treat invalid json format str as string, instead of throwing exception if false DECLARE_mBool(variant_throw_exeception_on_invalid_json); -DECLARE_mInt32(variant_max_subcolumns_count); - DECLARE_mBool(enable_merge_on_write_correctness_check); // USED FOR DEBUGING // core directly if the compaction found there's duplicate key on mow table diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 1e53ddc7364..190970a745f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -340,7 +340,8 @@ void CompactionMixin::build_basic_info() { std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size()); std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); - _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas); + _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas) + ->copy_without_variant_extracted_columns(); } bool CompactionMixin::handle_ordered_data_compaction() { @@ -1104,7 +1105,8 @@ void CloudCompactionMixin::build_basic_info() { std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size()); std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); - _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas); + _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas) + ->copy_without_variant_extracted_columns(); } int64_t CloudCompactionMixin::get_compaction_permits() { diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 97e8b1ca1da..ce026fe3935 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -331,7 +331,21 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF _subcolumn_readers = std::make_unique<SubcolumnColumnReaders>(); const ColumnMetaPB& self_column_pb = footer.columns(column_id); for (const ColumnMetaPB& column_pb : footer.columns()) { - if (column_pb.unique_id() != self_column_pb.unique_id()) { + // Find all columns belonging to the current variant column + // 1. not the variant column + if (!column_pb.has_column_path_info()) { + continue; + } + + // 2. other variant root columns + if (column_pb.type() == (int)FieldType::OLAP_FIELD_TYPE_VARIANT && + column_pb.unique_id() != self_column_pb.unique_id()) { + continue; + } + + // 3. other variant's subcolumns + if (column_pb.type() != (int)FieldType::OLAP_FIELD_TYPE_VARIANT && + column_pb.column_path_info().parrent_column_unique_id() != self_column_pb.unique_id()) { continue; } DCHECK(column_pb.has_column_path_info()); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index 8df56dc05e8..b625c3b2d8a 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -194,7 +194,8 @@ Status HierarchicalDataReader::_process_nested_columns( // into a new object column and wrap it with array column using the first element offsets.The wrapped array column // will type the type of ColumnObject::NESTED_TYPE, whih is Nullable<ColumnArray<NULLABLE(ColumnObject)>>. for (const auto& entry : nested_subcolumns) { - MutableColumnPtr nested_object = ColumnObject::create(true); + MutableColumnPtr nested_object = + ColumnObject::create(container_variant.max_subcolumns_count()); const auto* base_array = check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column)); MutableColumnPtr offset = base_array->get_offsets_ptr()->assume_mutable(); @@ -238,13 +239,13 @@ Status HierarchicalDataReader::_process_nested_columns( parent_path.unset_nested(); DCHECK(!parent_path.has_nested_part()); container_variant.add_sub_column(parent_path, array->assume_mutable(), - ColumnObject::NESTED_TYPE); + container_variant.NESTED_TYPE); } return Status::OK(); } Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& container, - size_t nrows) { + size_t nrows, int32_t max_subcolumns_count) { using namespace vectorized; // build variant as container @@ -262,13 +263,13 @@ Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& con MutableColumnPtr column = _root_reader->column->get_ptr(); // container_variant.add_sub_column({}, std::move(column), _root_reader->type); DCHECK(column->size() == nrows); - container = ColumnObject::create(_root_reader->type, std::move(column)); + container = + ColumnObject::create(max_subcolumns_count, _root_reader->type, std::move(column)); } else { auto root_type = - vectorized::DataTypeFactory::instance().create_data_type(TypeIndex::Nothing, true); - MutableColumnPtr column = root_type->create_column(); - column->insert_many_defaults(nrows); - container = ColumnObject::create(root_type, std::move(column)); + vectorized::DataTypeFactory::instance().create_data_type(TypeIndex::Nothing, false); + auto column = vectorized::ColumnNothing::create(nrows); + container = ColumnObject::create(max_subcolumns_count, root_type, std::move(column)); } auto& container_variant = assert_cast<ColumnObject&>(*container); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index c50ac26169e..af9a584fbc1 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -109,7 +109,8 @@ private: // 3. init container with subcolumns // 4. init container with nested subcolumns // 5. init container with sparse column - Status _init_container(vectorized::MutableColumnPtr& container, size_t nrows); + Status _init_container(vectorized::MutableColumnPtr& container, size_t nrows, + int max_subcolumns_count); // clear all subcolumns's column data for next batch read // set null map for nullable column @@ -147,7 +148,7 @@ private: } MutableColumnPtr container; - RETURN_IF_ERROR(_init_container(container, nrows)); + RETURN_IF_ERROR(_init_container(container, nrows, variant.max_subcolumns_count())); auto& container_variant = assert_cast<ColumnObject&>(*container); variant.insert_range_from(container_variant, 0, nrows); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 1e6508de0e2..f76a5e1b5e7 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -555,14 +555,13 @@ Status Segment::healthy_status() { // Return the storage datatype of related column to field. // Return nullptr meaning no such storage infomation for this column -vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identifier, +vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column, bool read_flat_leaves) const { // Path has higher priority - auto relative_path = identifier.path != nullptr ? identifier.path->copy_pop_front() - : vectorized::PathInData(); + auto path = column.path_info_ptr(); + auto relative_path = path != nullptr ? path->copy_pop_front() : vectorized::PathInData(); if (!relative_path.empty()) { - int32_t unique_id = - identifier.unique_id > 0 ? identifier.unique_id : identifier.parent_unique_id; + int32_t unique_id = column.unique_id() > 0 ? column.unique_id() : column.parent_unique_id(); const auto* node = _column_readers.contains(unique_id) ? ((VariantColumnReader*)(_column_readers.at(unique_id).get())) ->get_reader_by_path(relative_path) @@ -577,9 +576,11 @@ vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identi return nullptr; } // it contains children or column missing in storage, so treat it as variant - return identifier.is_nullable - ? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>()) - : std::make_shared<vectorized::DataTypeObject>(); + return column.is_nullable() + ? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>( + column.variant_max_subcolumns_count())) + : std::make_shared<vectorized::DataTypeObject>( + column.variant_max_subcolumns_count()); } // TODO support normal column type return nullptr; @@ -1012,12 +1013,7 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) { bool Segment::same_with_storage_type(int32_t cid, const Schema& schema, bool read_flat_leaves) const { const auto* col = schema.column(cid); - auto file_column_type = - get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(), - .parent_unique_id = col->parent_unique_id(), - .path = col->path(), - .is_nullable = col->is_nullable()}, - read_flat_leaves); + auto file_column_type = get_data_type_of(col->get_desc(), read_flat_leaves); auto expected_type = Schema::get_data_type_ptr(*col); #ifndef NDEBUG if (file_column_type && !file_column_type->equals(*expected_type)) { @@ -1045,18 +1041,13 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto }; std::vector<segment_v2::rowid_t> single_row_loc {row_id}; if (!slot->column_paths().empty()) { - vectorized::PathInDataPtr path = std::make_shared<vectorized::PathInData>( - schema.column_by_uid(slot->col_unique_id()).name_lower_case(), - slot->column_paths()); - auto storage_type = get_data_type_of(ColumnIdentifier {.unique_id = slot->col_unique_id(), - .path = path, - .is_nullable = slot->is_nullable()}, - false); - vectorized::MutableColumnPtr file_storage_column = storage_type->create_column(); - DCHECK(storage_type != nullptr); TabletColumn column = TabletColumn::create_materialized_variant_column( schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(), - slot->col_unique_id()); + slot->col_unique_id(), slot->type().max_subcolumns_count()); + auto storage_type = get_data_type_of(column, false); + vectorized::MutableColumnPtr file_storage_column = storage_type->create_column(); + DCHECK(storage_type != nullptr); + if (iterator_hint == nullptr) { RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_opt)); RETURN_IF_ERROR(iterator_hint->init(opt)); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 9fe545006e3..03b0581e783 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -163,19 +163,12 @@ public: int64_t meta_mem_usage() const { return _meta_mem_usage; } - // Identify the column by unique id or path info - struct ColumnIdentifier { - int32_t unique_id = -1; - int32_t parent_unique_id = -1; - vectorized::PathInDataPtr path; - bool is_nullable = false; - }; // Get the inner file column's data type // ignore_chidren set to false will treat field as variant // when it contains children with field paths. // nullptr will returned if storage type does not contains such column - std::shared_ptr<const vectorized::IDataType> get_data_type_of( - const ColumnIdentifier& identifier, bool read_flat_leaves) const; + std::shared_ptr<const vectorized::IDataType> get_data_type_of(const TabletColumn& column, + bool read_flat_leaves) const; // Check is schema read type equals storage column type bool same_with_storage_type(int32_t cid, const Schema& schema, bool read_flat_leaves) const; @@ -185,11 +178,7 @@ public: ReaderType read_type) const { const Field* col = schema.column(cid); vectorized::DataTypePtr storage_column_type = - get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(), - .parent_unique_id = col->parent_unique_id(), - .path = col->path(), - .is_nullable = col->is_nullable()}, - read_type != ReaderType::READER_QUERY); + get_data_type_of(col->get_desc(), read_type != ReaderType::READER_QUERY); if (storage_column_type == nullptr) { // Default column iterator return true; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 96b0bea2ae8..7bcbee7084a 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -315,13 +315,7 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) { const Field* col = _schema->column(i); if (col) { auto storage_type = _segment->get_data_type_of( - Segment::ColumnIdentifier { - col->unique_id(), - col->parent_unique_id(), - col->path(), - col->is_nullable(), - }, - _opts.io_ctx.reader_type != ReaderType::READER_QUERY); + col->get_desc(), _opts.io_ctx.reader_type != ReaderType::READER_QUERY); if (storage_type == nullptr) { storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 5588661302d..9b7f078dc30 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -263,13 +263,8 @@ private: if (block_cid >= block->columns()) { continue; } - vectorized::DataTypePtr storage_type = _segment->get_data_type_of( - Segment::ColumnIdentifier { - .unique_id = _schema->column(cid)->unique_id(), - .parent_unique_id = _schema->column(cid)->parent_unique_id(), - .path = _schema->column(cid)->path(), - .is_nullable = _schema->column(cid)->is_nullable()}, - false); + vectorized::DataTypePtr storage_type = + _segment->get_data_type_of(_schema->column(cid)->get_desc(), false); if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) { // Do additional cast vectorized::MutableColumnPtr tmp = storage_type->create_column(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 60fd7cea28a..4cd91d22a3c 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -166,6 +166,7 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, meta->set_result_is_nullable(column.get_result_is_nullable()); meta->set_function_name(column.get_aggregation_name()); meta->set_be_exec_version(column.get_be_exec_version()); + meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); } Status SegmentWriter::init() { diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 442595eb0c0..bb8558ee3d4 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -47,8 +47,9 @@ Status VariantColumnWriterImpl::init() { // caculate stats info std::set<std::string> subcolumn_paths; RETURN_IF_ERROR(_get_subcolumn_paths_from_stats(subcolumn_paths)); - - auto col = vectorized::ColumnObject::create(true); + DCHECK(_tablet_column->variant_max_subcolumns_count() >= 0) + << "max subcolumns count is: " << _tablet_column->variant_max_subcolumns_count(); + auto col = vectorized::ColumnObject::create(_tablet_column->variant_max_subcolumns_count()); for (const auto& str_path : subcolumn_paths) { DCHECK(col->add_sub_column(vectorized::PathInData(str_path), 0)); } @@ -100,7 +101,11 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st } // Check if the number of all subcolumn paths exceeds the limit. - if (path_to_total_number_of_non_null_values.size() > config::variant_max_subcolumns_count) { + DCHECK(_tablet_column->variant_max_subcolumns_count() >= 0) + << "max subcolumns count is: " << _tablet_column->variant_max_subcolumns_count(); + if (_tablet_column->variant_max_subcolumns_count() && + path_to_total_number_of_non_null_values.size() > + _tablet_column->variant_max_subcolumns_count()) { // Sort paths by total number of non null values. std::vector<std::pair<size_t, std::string_view>> paths_with_sizes; paths_with_sizes.reserve(path_to_total_number_of_non_null_values.size()); @@ -111,7 +116,7 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st // Fill subcolumn_paths with first subcolumn paths in sorted list. // reserve 1 for root column for (const auto& [size, path] : paths_with_sizes) { - if (paths.size() < config::variant_max_subcolumns_count) { + if (paths.size() < _tablet_column->variant_max_subcolumns_count()) { VLOG_DEBUG << "pick " << path << " as subcolumn"; paths.emplace(path); } @@ -204,7 +209,7 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt auto full_path = full_path_builder.append(_tablet_column->name_lower_case(), false) .append(entry->path.get_parts(), false) .build(); - // set unique_id and parent_unique_id, will use unique_id to get iterator correct + // set unique_id and parent_unique_id, will use parent_unique_id to get iterator correct return vectorized::schema_util::get_column_by_type( final_data_type_from_object, column_name, vectorized::schema_util::ExtraInfo {.unique_id = _tablet_column->unique_id(), diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index d9298d42db7..1f9d5d3191a 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -65,6 +65,7 @@ public: Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows); private: + // not including root column void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); // subcolumn path from variant stats info to distinguish from sparse column diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 1b6da1dbf4c..c06a568fb95 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -168,6 +168,7 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); } + meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); } Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 9a27b95dbcd..ae58464974e 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -440,6 +440,9 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco init_column_from_tcolumn(tcolumn.children_column[i].col_unique_id, tcolumn.children_column[i], children_column); } + if (tcolumn.column_type.__isset.variant_max_subcolumns_count) { + column->set_variant_max_subcolumns_count(tcolumn.column_type.variant_max_subcolumns_count); + } } Status TabletMeta::create_from_file(const string& file_path) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c4f96e22148..e1b82cc015f 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -581,11 +581,14 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _sparse_cols.emplace_back(std::make_shared<TabletColumn>(std::move(column))); _num_sparse_columns++; } + if (column.has_variant_max_subcolumns_count()) { + _variant_max_subcolumns_count = column.variant_max_subcolumns_count(); + } } -TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root, - const std::vector<std::string>& paths, - int32_t parent_unique_id) { +TabletColumn TabletColumn::create_materialized_variant_column( + const std::string& root, const std::vector<std::string>& paths, int32_t parent_unique_id, + int32_t variant_max_subcolumns_count) { TabletColumn subcol; subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT); subcol.set_is_nullable(true); @@ -594,6 +597,7 @@ TabletColumn TabletColumn::create_materialized_variant_column(const std::string& vectorized::PathInData path(root, paths); subcol.set_path_info(path); subcol.set_name(path.get_path()); + subcol.set_variant_max_subcolumns_count(variant_max_subcolumns_count); return subcol; } @@ -658,6 +662,7 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const { ColumnPB* sparse_column = column->add_sparse_columns(); col->to_schema_pb(sparse_column); } + column->set_variant_max_subcolumns_count(_variant_max_subcolumns_count); } void TabletColumn::add_sub_column(TabletColumn& sub_column) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 5fb3deafd77..f18c488f5ad 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -107,7 +107,8 @@ public: // add them into tablet_schema for later column indexing. static TabletColumn create_materialized_variant_column(const std::string& root, const std::vector<std::string>& paths, - int32_t parent_unique_id); + int32_t parent_unique_id, + int32_t max_subcolumns_count); bool has_default_value() const { return _has_default_value; } std::string default_value() const { return _default_value; } size_t length() const { return _length; } @@ -199,6 +200,11 @@ public: return Status::OK(); } + void set_variant_max_subcolumns_count(int32_t variant_max_subcolumns_count) { + _variant_max_subcolumns_count = variant_max_subcolumns_count; + } + int32_t variant_max_subcolumns_count() const { return _variant_max_subcolumns_count; } + private: int32_t _unique_id = -1; std::string _col_name; @@ -247,6 +253,7 @@ private: // Use shared_ptr for reuse and reducing column memory usage std::vector<TabletColumnPtr> _sparse_cols; size_t _num_sparse_columns = 0; + int32_t _variant_max_subcolumns_count = 0; }; bool operator==(const TabletColumn& a, const TabletColumn& b); @@ -582,6 +589,7 @@ private: // ATTN: For compability reason empty cids means all columns of tablet schema are encoded to row column std::vector<int32_t> _row_store_column_unique_ids; bool _variant_enable_flatten_nested = false; + int64_t _vl_field_mem_size {0}; // variable length field }; diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp index 7b7154fb38a..7fecdc0a77d 100644 --- a/be/src/runtime/types.cpp +++ b/be/src/runtime/types.cpp @@ -63,6 +63,10 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& types, int* idx) } else { len = OLAP_STRING_MAX_LENGTH; } + } else if (type == TYPE_VARIANT) { + DCHECK(scalar_type.variant_max_subcolumns_count >= 0) + << "count is: " << scalar_type.variant_max_subcolumns_count; + variant_max_subcolumns_count = scalar_type.variant_max_subcolumns_count; } break; } @@ -162,6 +166,8 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) const { DCHECK_NE(scale, -1); scalar_type.__set_precision(precision); scalar_type.__set_scale(scale); + } else if (type == TYPE_VARIANT) { + scalar_type.__set_variant_max_subcolumns_count(variant_max_subcolumns_count); } } } @@ -206,6 +212,7 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const { } } else if (type == TYPE_VARIANT) { node->set_type(TTypeNodeType::VARIANT); + node->set_variant_max_subcolumns_count(variant_max_subcolumns_count); } } @@ -276,6 +283,7 @@ TypeDescriptor::TypeDescriptor(const google::protobuf::RepeatedPtrField<PTypeNod } case TTypeNodeType::VARIANT: { type = TYPE_VARIANT; + variant_max_subcolumns_count = node.variant_max_subcolumns_count(); break; } default: @@ -337,7 +345,8 @@ std::string TypeDescriptor::debug_string() const { return ss.str(); } case TYPE_VARIANT: - ss << "VARIANT"; + ss << "VARIANT" + << ", max subcolumns count: " << variant_max_subcolumns_count; return ss.str(); default: return type_to_string(type); diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 5d2f83c1bd6..16911e7b95a 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -65,16 +65,22 @@ struct TypeDescriptor { // Whether subtypes of a complex type is nullable std::vector<bool> contains_nulls; + // Only set if type == TYPE_VARIANT + int variant_max_subcolumns_count = 0; + TypeDescriptor() : type(INVALID_TYPE), len(-1), precision(-1), scale(-1) {} // explicit TypeDescriptor(PrimitiveType type) : - TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1), scale(-1) { + TypeDescriptor(PrimitiveType type, int variant_max_subcolumns_count_ = -1) + : type(type), len(-1), precision(-1), scale(-1) { if (type == TYPE_DECIMALV2) { precision = 27; scale = 9; } else if (type == TYPE_DATETIMEV2) { precision = 18; scale = 6; + } else if (type == TYPE_VARIANT) { + variant_max_subcolumns_count = variant_max_subcolumns_count_; } } @@ -181,6 +187,9 @@ struct TypeDescriptor { if (type == TYPE_DECIMALV2) { return precision == o.precision && scale == o.scale; } + if (type == TYPE_VARIANT) { + return variant_max_subcolumns_count == o.variant_max_subcolumns_count; + } return true; } @@ -261,6 +270,8 @@ struct TypeDescriptor { // use to struct type add sub type void add_sub_type(TypeDescriptor sub_type, std::string field_name, bool is_nullable = true); + int32_t max_subcolumns_count() const { return variant_max_subcolumns_count; } + private: /// Used to create a possibly nested type from the flattened Thrift representation. /// diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 1088284614b..28dbc59f383 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -51,8 +51,6 @@ #include "vec/aggregate_functions/helpers.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" -#include "vec/columns/column_map.h" -#include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" @@ -61,19 +59,11 @@ #include "vec/common/field_visitors.h" #include "vec/common/schema_util.h" #include "vec/common/string_buffer.hpp" -#include "vec/common/string_ref.h" #include "vec/core/column_with_type_and_name.h" -#include "vec/core/field.h" -#include "vec/core/types.h" #include "vec/data_types/convert_field_to_type.h" -#include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_factory.hpp" -#include "vec/data_types/data_type_jsonb.h" #include "vec/data_types/data_type_nothing.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_object.h" #include "vec/data_types/get_least_supertype.h" #include "vec/json/path_in_data.h" @@ -643,7 +633,7 @@ MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const { return finalized_object.apply_for_columns(std::forward<Func>(func)); } auto new_root = func(get_root())->assume_mutable(); - auto res = ColumnObject::create(get_root_type(), std::move(new_root)); + auto res = ColumnObject::create(_max_subcolumns_count, get_root_type(), std::move(new_root)); for (const auto& subcolumn : subcolumns) { if (subcolumn->data.is_root) { continue; @@ -806,28 +796,39 @@ ColumnObject::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_, boo : base_type->get_type_id(); } -ColumnObject::ColumnObject(bool is_nullable_) : is_nullable(is_nullable_), num_rows(0) { +ColumnObject::ColumnObject(int32_t max_subcolumns_count) + : is_nullable(true), num_rows(0), _max_subcolumns_count(max_subcolumns_count) { subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/)); ENABLE_CHECK_CONSISTENCY(this); } -ColumnObject::ColumnObject(DataTypePtr root_type, MutableColumnPtr&& root_column) - : is_nullable(true), num_rows(root_column->size()) { +ColumnObject::ColumnObject(int32_t max_subcolumns_count, DataTypePtr root_type, + MutableColumnPtr&& root_column) + : is_nullable(true), + num_rows(root_column->size()), + _max_subcolumns_count(max_subcolumns_count) { subcolumns.create_root( Subcolumn(std::move(root_column), root_type, is_nullable, true /*root*/)); serialized_sparse_column->insert_many_defaults(num_rows); ENABLE_CHECK_CONSISTENCY(this); } -ColumnObject::ColumnObject(Subcolumns&& subcolumns_) +ColumnObject::ColumnObject(int32_t max_subcolumns_count, Subcolumns&& subcolumns_) : is_nullable(true), subcolumns(std::move(subcolumns_)), - num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size()) { + num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size()), + _max_subcolumns_count(max_subcolumns_count) { + if (max_subcolumns_count && subcolumns_.size() > max_subcolumns_count + 1) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "unmatched max subcolumns count:, max subcolumns count: {}, but " + "subcolumns count: {}", + max_subcolumns_count, subcolumns_.size()); + } serialized_sparse_column->insert_many_defaults(num_rows); - ENABLE_CHECK_CONSISTENCY(this); } -ColumnObject::ColumnObject(size_t size) : is_nullable(true), num_rows(0) { +ColumnObject::ColumnObject(int32_t max_subcolumns_count, size_t size) + : is_nullable(true), num_rows(0), _max_subcolumns_count(max_subcolumns_count) { subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/)); insert_many_defaults(size); ENABLE_CHECK_CONSISTENCY(this); @@ -882,7 +883,7 @@ size_t ColumnObject::size() const { MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const { if (new_size == 0) { - return ColumnObject::create(is_nullable); + return ColumnObject::create(_max_subcolumns_count); } return apply_for_columns( [&](const ColumnPtr column) { return column->clone_resized(new_size); }); @@ -1299,10 +1300,11 @@ void ColumnObject::set_num_rows(size_t n) { } bool ColumnObject::try_add_new_subcolumn(const PathInData& path) { + DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " << _max_subcolumns_count; if (subcolumns.get_root() == nullptr || path.empty()) { throw Exception(ErrorCode::INTERNAL_ERROR, "column object has no root or path is empty"); } - if (subcolumns.size() < config::variant_max_subcolumns_count + 1) { + if (!_max_subcolumns_count || subcolumns.size() < _max_subcolumns_count + 1) { return add_sub_column(path, num_rows); } @@ -1954,6 +1956,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { ENABLE_CHECK_CONSISTENCY(this); return Status::OK(); } + DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " << _max_subcolumns_count; Subcolumns new_subcolumns; if (auto root = subcolumns.get_mutable_root(); root == nullptr) { @@ -1967,7 +1970,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { // 2. root column must be exsit in subcolumns bool need_pick_subcolumn_to_sparse_column = mode == FinalizeMode::WRITE_MODE && - subcolumns.size() > config::variant_max_subcolumns_count + 1; + (_max_subcolumns_count && subcolumns.size() > _max_subcolumns_count + 1); // finalize all subcolumns for (auto&& entry : subcolumns) { @@ -1978,8 +1981,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { const auto& least_common_type = entry->data.get_least_common_type(); // unnest all nested columns, add them to new_subcolumns - if (mode == FinalizeMode::WRITE_MODE && - least_common_type->equals(*ColumnObject::NESTED_TYPE)) { + if (mode == FinalizeMode::WRITE_MODE && least_common_type->equals(*NESTED_TYPE)) { unnest(entry, new_subcolumns); continue; } @@ -2017,8 +2019,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { [](const auto& a, const auto& b) { return a.second > b.second; }); // 3. pick config::variant_max_subcolumns_count selected subcolumns - for (size_t i = 0; - i < std::min(size_t(config::variant_max_subcolumns_count), sorted_by_size.size()); + for (size_t i = 0; i < std::min(size_t(_max_subcolumns_count), sorted_by_size.size()); ++i) { // if too many null values, then consider it as sparse column if (sorted_by_size[i].second < num_rows * 0.95) { @@ -2088,12 +2089,13 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { return finalized_object.filter(filter, count); } if (subcolumns.empty()) { - auto res = ColumnObject::create(count_bytes_in_filter(filter)); + auto res = ColumnObject::create(_max_subcolumns_count, count_bytes_in_filter(filter)); ENABLE_CHECK_CONSISTENCY(res.get()); return res; } auto new_root = get_root()->filter(filter, count)->assume_mutable(); - auto new_column = ColumnObject::create(get_root_type(), std::move(new_root)); + auto new_column = + ColumnObject::create(_max_subcolumns_count, get_root_type(), std::move(new_root)); for (auto& entry : subcolumns) { if (entry->data.is_root) { continue; diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 05e2693ed71..fcd275b89d5 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -46,9 +46,11 @@ #include "vec/core/field.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_jsonb.h" #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_object.h" #include "vec/data_types/serde/data_type_serde.h" #include "vec/io/reader_buffer.h" #include "vec/json/path_in_data.h" @@ -268,18 +270,21 @@ private: WrappedPtr serialized_sparse_column = ColumnMap::create( ColumnString::create(), ColumnString::create(), ColumnArray::ColumnOffsets::create()); + int32_t _max_subcolumns_count = 0; + public: static constexpr auto COLUMN_NAME_DUMMY = "_dummy"; // always create root: data type nothing - explicit ColumnObject(bool is_nullable_); + explicit ColumnObject(int32_t max_subcolumns_count); // always create root: data type nothing - explicit ColumnObject(size_t size = 0); + explicit ColumnObject(int32_t max_subcolumns_count, size_t size); - explicit ColumnObject(DataTypePtr root_type, MutableColumnPtr&& root_column); + explicit ColumnObject(int32_t max_subcolumns_count, DataTypePtr root_type, + MutableColumnPtr&& root_column); - explicit ColumnObject(Subcolumns&& subcolumns_); + explicit ColumnObject(int32_t max_subcolumns_count, Subcolumns&& subcolumns_); ~ColumnObject() override = default; @@ -346,6 +351,8 @@ public: size_t rows() const { return num_rows; } + int32_t max_subcolumns_count() const { return _max_subcolumns_count; } + /// Adds a subcolumn from existing IColumn. bool add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn, DataTypePtr type); diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index fdae135cc81..2b205bbdb2a 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -162,10 +162,16 @@ Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, Co // nullable to Variant instead of the root of Variant // correct output: Nullable(Array(int)) -> Nullable(Variant(Nullable(Array(int)))) // incorrect output: Nullable(Array(int)) -> Nullable(Variant(Array(int))) - if (WhichDataType(remove_nullable(type)).is_variant_type()) { - // set variant root column/type to from column/type - auto variant = ColumnObject::create(true /*always nullable*/); + if (auto to_type = remove_nullable(type); WhichDataType(to_type).is_variant_type()) { + if (auto from_type = remove_nullable(arg.type); + WhichDataType(from_type).is_variant_type()) { + return Status::InternalError("Not support cast: from {} to {}", arg.type->get_name(), + type->get_name()); + } CHECK(arg.column->is_nullable()); + const auto& data_type_object = assert_cast<const DataTypeObject&>(*to_type); + auto variant = ColumnObject::create(data_type_object.variant_max_subcolumns_count()); + variant->create_root(arg.type, arg.column->assume_mutable()); ColumnPtr nullable = ColumnNullable::create( variant->get_ptr(), @@ -511,7 +517,7 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos, } if (scalar_root_column->is_column_string()) { - variant_column = ColumnObject::create(true); + variant_column = ColumnObject::create(var.max_subcolumns_count()); parse_json_to_variant(*variant_column.get(), assert_cast<const ColumnString&>(*scalar_root_column), config); } else { @@ -614,6 +620,7 @@ TabletColumn create_sparse_column(const TabletColumn& variant) { res.set_type(FieldType::OLAP_FIELD_TYPE_MAP); res.set_aggregation_method(variant.aggregation()); res.set_path_info(PathInData {SPARSE_COLUMN_PATH}); + res.set_parent_unique_id(variant.unique_id()); TabletColumn child_tcolumn; child_tcolumn.set_type(FieldType::OLAP_FIELD_TYPE_STRING); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 2242db3f905..04b62497ae8 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -596,7 +596,9 @@ public: << " src type: " << block.get_by_position(i).type->get_name(); DCHECK(((DataTypeNullable*)_data_types[i].get()) ->get_nested_type() - ->equals(*block.get_by_position(i).type)); + ->equals(*block.get_by_position(i).type)) + << " target type: " << _data_types[i]->get_name() + << " src type: " << block.get_by_position(i).type->get_name(); DCHECK(!block.get_by_position(i).type->is_nullable()); _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) ->convert_to_full_column_if_const(), diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index 369809d77f6..cb0fb452bfe 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -102,6 +102,8 @@ DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, bool names.push_back(col_desc.get_sub_column(i).name()); } nested = std::make_shared<DataTypeStruct>(dataTypes, names); + } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_VARIANT) { + nested = std::make_shared<DataTypeObject>(col_desc.variant_max_subcolumns_count()); } else { nested = _create_primitive_data_type(col_desc.type(), col_desc.precision(), col_desc.frac()); @@ -163,7 +165,8 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo nested = std::make_shared<vectorized::DataTypeFloat64>(); break; case TYPE_VARIANT: - nested = std::make_shared<vectorized::DataTypeObject>("", true); + nested = + std::make_shared<vectorized::DataTypeObject>(col_desc.variant_max_subcolumns_count); break; case TYPE_STRING: case TYPE_CHAR: @@ -305,9 +308,6 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeIndex& type_index, bool case TypeIndex::String: nested = std::make_shared<vectorized::DataTypeString>(); break; - case TypeIndex::VARIANT: - nested = std::make_shared<vectorized::DataTypeObject>("", true); - break; case TypeIndex::Decimal32: nested = std::make_shared<DataTypeDecimal<Decimal32>>(BeConsts::MAX_DECIMAL32_PRECISION, 0); break; @@ -408,9 +408,6 @@ DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& type, case FieldType::OLAP_FIELD_TYPE_STRING: result = std::make_shared<vectorized::DataTypeString>(); break; - case FieldType::OLAP_FIELD_TYPE_VARIANT: - result = std::make_shared<vectorized::DataTypeObject>("", true); - break; case FieldType::OLAP_FIELD_TYPE_JSONB: result = std::make_shared<vectorized::DataTypeJsonb>(); break; @@ -489,7 +486,7 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { nested = std::make_shared<DataTypeString>(); break; case PGenericType::VARIANT: - nested = std::make_shared<DataTypeObject>("", true); + nested = std::make_shared<DataTypeObject>(pcolumn.variant_max_subcolumns_count()); break; case PGenericType::JSONB: nested = std::make_shared<DataTypeJsonb>(); diff --git a/be/src/vec/data_types/data_type_object.cpp b/be/src/vec/data_types/data_type_object.cpp index 5829554d118..551c30bd3c7 100644 --- a/be/src/vec/data_types/data_type_object.cpp +++ b/be/src/vec/data_types/data_type_object.cpp @@ -47,10 +47,17 @@ class IColumn; namespace doris::vectorized { #include "common/compile_check_begin.h" -DataTypeObject::DataTypeObject(const String& schema_format_, bool is_nullable_) - : schema_format(to_lower(schema_format_)), is_nullable(is_nullable_) {} +DataTypeObject::DataTypeObject(int32_t max_subcolumns_count) + : _max_subcolumns_count(max_subcolumns_count) {} bool DataTypeObject::equals(const IDataType& rhs) const { - return typeid_cast<const DataTypeObject*>(&rhs) != nullptr; + auto rhs_type = typeid_cast<const DataTypeObject*>(&rhs); + if (rhs_type && _max_subcolumns_count != rhs_type->variant_max_subcolumns_count()) { + VLOG_DEBUG << "_max_subcolumns_count is" << _max_subcolumns_count + << "rhs_type->variant_max_subcolumns_count()" + << rhs_type->variant_max_subcolumns_count(); + return false; + } + return rhs_type && _max_subcolumns_count == rhs_type->variant_max_subcolumns_count(); } int64_t DataTypeObject::get_uncompressed_serialized_bytes(const IColumn& column, @@ -182,7 +189,6 @@ const char* DataTypeObject::deserialize(const char* buf, MutableColumnPtr* colum // serialize num of rows, only take effect when subcolumns empty if (be_exec_version >= VARIANT_SERDE) { num_rows = *reinterpret_cast<const uint32_t*>(buf); - column_object->set_num_rows(num_rows); buf += sizeof(uint32_t); } @@ -192,6 +198,12 @@ const char* DataTypeObject::deserialize(const char* buf, MutableColumnPtr* colum buf = ColumnObject::get_sparse_column_type()->deserialize(buf, &sparse_column, be_exec_version); column_object->set_sparse_column(std::move(sparse_column)); + if (!root_added && column_object->get_subcolumn({})) { + column_object->get_subcolumn({})->insert_many_defaults(num_rows); + } + + column_object->set_num_rows(num_rows); + column_object->finalize(); #ifndef NDEBUG // DCHECK size @@ -212,4 +224,13 @@ void DataTypeObject::to_string(const IColumn& column, size_t row_num, BufferWrit static_cast<void>(variant.serialize_one_row_to_string(cast_set<Int32>(row_num), ostr)); } +void DataTypeObject::to_pb_column_meta(PColumnMeta* col_meta) const { + IDataType::to_pb_column_meta(col_meta); + col_meta->set_variant_max_subcolumns_count(_max_subcolumns_count); +} + +MutableColumnPtr DataTypeObject::create_column() const { + return ColumnObject::create(_max_subcolumns_count); +} + } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_object.h b/be/src/vec/data_types/data_type_object.h index 7723d1c5377..272f086fc83 100644 --- a/be/src/vec/data_types/data_type_object.h +++ b/be/src/vec/data_types/data_type_object.h @@ -34,7 +34,6 @@ #include "runtime/define_primitive_type.h" #include "runtime/types.h" #include "serde/data_type_object_serde.h" -#include "vec/columns/column_object.h" #include "vec/common/assert_cast.h" #include "vec/core/field.h" #include "vec/core/types.h" @@ -50,22 +49,21 @@ class IColumn; namespace doris::vectorized { class DataTypeObject : public IDataType { private: - String schema_format; - bool is_nullable; + int32_t _max_subcolumns_count = 0; public: - DataTypeObject(const String& schema_format_ = "json", bool is_nullable_ = true); + DataTypeObject() {} + DataTypeObject(int32_t max_subcolumns_count); const char* get_family_name() const override { return "Variant"; } TypeIndex get_type_id() const override { return TypeIndex::VARIANT; } TypeDescriptor get_type_as_type_descriptor() const override { - return TypeDescriptor(TYPE_VARIANT); + return TypeDescriptor(TYPE_VARIANT, _max_subcolumns_count); } doris::FieldType get_storage_field_type() const override { return doris::FieldType::OLAP_FIELD_TYPE_VARIANT; } - MutableColumnPtr create_column() const override { return ColumnObject::create(is_nullable); } - bool is_object() const override { return true; } + MutableColumnPtr create_column() const override; bool equals(const IDataType& rhs) const override; bool have_subtypes() const override { return true; }; int64_t get_uncompressed_serialized_bytes(const IColumn& column, @@ -93,5 +91,7 @@ public: DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override { return std::make_shared<DataTypeObjectSerDe>(nesting_level); }; + void to_pb_column_meta(PColumnMeta* col_meta) const override; + int32_t variant_max_subcolumns_count() const { return _max_subcolumns_count; } }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/get_least_supertype.cpp b/be/src/vec/data_types/get_least_supertype.cpp index 82bea452923..a0f27482b5a 100644 --- a/be/src/vec/data_types/get_least_supertype.cpp +++ b/be/src/vec/data_types/get_least_supertype.cpp @@ -281,10 +281,6 @@ void get_least_supertype_jsonb(const TypeIndexSet& types, DataTypePtr* type) { *type = std::make_shared<DataTypeJsonb>(); return; } - if (which.is_variant_type()) { - *type = std::make_shared<DataTypeObject>(); - return; - } if (which.is_date_v2()) { *type = std::make_shared<DataTypeDateV2>(); return; diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 2cb8bae915d..0e0274ab794 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -47,24 +47,12 @@ Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column, int64_t row_idx, bool col_const, const FormatOptions& options) const { const auto& variant = assert_cast<const ColumnObject&>(column); - if (variant.is_scalar_variant()) { - // Serialize scalar types, like int, string, array, faster path - const auto& root = variant.get_subcolumn({}); - RETURN_IF_ERROR(root->get_least_common_type_serde()->write_column_to_mysql( - root->get_finalized_column(), row_buffer, row_idx, col_const, options)); - } else { - // Serialize hierarchy types to json format - std::string buffer; - bool is_null = false; - if (!variant.serialize_one_row_to_string(row_idx, &buffer)) { - return Status::InternalError("Invalid json format"); - } - if (is_null) { - row_buffer.push_null(); - } else { - row_buffer.push_string(buffer.data(), buffer.size()); - } + // Serialize hierarchy types to json format + std::string buffer; + if (!variant.serialize_one_row_to_string(row_idx, &buffer)) { + return Status::InternalError("Invalid json format"); } + row_buffer.push_string(buffer.data(), buffer.size()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 4c0b30e440e..a822417e2b8 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -430,7 +430,8 @@ Status NewOlapScanner::_init_variant_columns() { // add them into tablet_schema for later column indexing. TabletColumn subcol = TabletColumn::create_materialized_variant_column( tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), - slot->column_paths(), slot->col_unique_id()); + slot->column_paths(), slot->col_unique_id(), + slot->type().max_subcolumns_count()); if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) { tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT); } diff --git a/be/src/vec/functions/array/function_array_utils.cpp b/be/src/vec/functions/array/function_array_utils.cpp index ab999aa21cc..89e1d9b860b 100644 --- a/be/src/vec/functions/array/function_array_utils.cpp +++ b/be/src/vec/functions/array/function_array_utils.cpp @@ -50,7 +50,16 @@ bool extract_column_array_info(const IColumn& src, ColumnArrayExecutionData& dat if (data.nested_col->is_nullable()) { const auto& nested_null_col = reinterpret_cast<const ColumnNullable&>(*data.nested_col); data.nested_nullmap_data = nested_null_col.get_null_map_data().data(); - data.nested_col = nested_null_col.get_nested_column_ptr().get(); + data.nested_col = nested_null_col.get_nested_column_ptr(); + } + if (data.output_as_variant && + !WhichDataType(remove_nullable(data.nested_type)).is_variant_type()) { + // set variant root column/type to from column/type + const auto& data_type_object = + assert_cast<const DataTypeObject&>(*remove_nullable(data.nested_type)); + auto variant = ColumnObject::create(data_type_object.variant_max_subcolumns_count()); + variant->create_root(data.nested_type, make_nullable(data.nested_col)->assume_mutable()); + data.nested_col = variant->get_ptr(); } return true; } diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 5de820dfa3a..9608c74a07e 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -1926,12 +1926,13 @@ private: static Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, const uint32_t result, size_t input_rows_count) { - // auto& data_type_to = block.get_by_position(result).type; + auto& data_type_to = block.get_by_position(result).type; const auto& col_with_type_and_name = block.get_by_position(arguments[0]); auto& from_type = col_with_type_and_name.type; auto& col_from = col_with_type_and_name.column; // set variant root column/type to from column/type - auto variant = ColumnObject::create(true /*always nullable*/); + const auto& data_type_object = assert_cast<const DataTypeObject&>(*data_type_to); + auto variant = ColumnObject::create(data_type_object.variant_max_subcolumns_count()); variant->create_root(from_type, col_from->assume_mutable()); block.replace_by_position(result, std::move(variant)); return Status::OK(); @@ -2245,10 +2246,10 @@ private: // variant needs to be judged first if (to_type->get_type_id() == TypeIndex::VARIANT) { - return create_variant_wrapper(from_type, static_cast<const DataTypeObject&>(*to_type)); + return create_variant_wrapper(from_type, assert_cast<const DataTypeObject&>(*to_type)); } if (from_type->get_type_id() == TypeIndex::VARIANT) { - return create_variant_wrapper(static_cast<const DataTypeObject&>(*from_type), to_type); + return create_variant_wrapper(assert_cast<const DataTypeObject&>(*from_type), to_type); } switch (from_type->get_type_id()) { diff --git a/be/src/vec/functions/function_variant_element.cpp b/be/src/vec/functions/function_variant_element.cpp index d549c36595f..65d788d3783 100644 --- a/be/src/vec/functions/function_variant_element.cpp +++ b/be/src/vec/functions/function_variant_element.cpp @@ -74,7 +74,10 @@ public: DCHECK(is_string(arguments[1])) << "Second argument for function: " << name << " should be String but it has type " << arguments[1]->get_name() << "."; - return make_nullable(std::make_shared<DataTypeObject>()); + auto arg_variant = remove_nullable(arguments[0]); + const auto& data_type_object = assert_cast<const DataTypeObject&>(*arg_variant); + return make_nullable( + std::make_shared<DataTypeObject>(data_type_object.variant_max_subcolumns_count())); } // wrap variant column with nullable @@ -120,11 +123,18 @@ public: } private: + // Return sub-path by specified prefix. + // For example, for prefix a.b: + // a.b.c.d -> c.d, a.b.c -> c + static std::string_view get_sub_path(const std::string_view& path, + const std::string_view& prefix) { + return path.substr(prefix.size() + 1); + } static Status get_element_column(const ColumnObject& src, const ColumnPtr& index_column, ColumnPtr* result) { std::string field_name = index_column->get_data_at(0).to_string(); if (src.empty()) { - *result = ColumnObject::create(true); + *result = ColumnObject::create(src.max_subcolumns_count()); // src subcolumns empty but src row count may not be 0 (*result)->assume_mutable()->insert_many_defaults(src.size()); (*result)->assume_mutable()->finalize(); @@ -151,7 +161,8 @@ private: result_column->insert_default(); } } - *result = ColumnObject::create(type, std::move(result_column)); + *result = ColumnObject::create(src.max_subcolumns_count(), type, + std::move(result_column)); (*result)->assume_mutable()->finalize(); return Status::OK(); } else { @@ -160,13 +171,69 @@ private: PathInData path(field_name); ColumnObject::Subcolumns subcolumns = mutable_ptr->get_subcolumns(); const auto* node = subcolumns.find_exact(path); - MutableColumnPtr result_col; + MutableColumnPtr result_col = ColumnObject::create(src.max_subcolumns_count()); + ColumnObject::Subcolumns new_subcolumns; + + auto extract_from_sparse_column = [&](auto& container) { + ColumnObject::Subcolumn root {0, true, true}; + // no root, no sparse column + const auto& sparse_data_map = + assert_cast<const ColumnMap&>(*mutable_ptr->get_sparse_column()); + const auto& src_sparse_data_offsets = sparse_data_map.get_offsets(); + const auto& src_sparse_data_paths = + assert_cast<const ColumnString&>(sparse_data_map.get_keys()); + const auto& src_sparse_data_values = + assert_cast<const ColumnString&>(sparse_data_map.get_values()); + auto& sparse_data_offsets = + assert_cast<ColumnMap&>(*container->get_sparse_column()->assume_mutable()) + .get_offsets(); + auto [sparse_data_paths, sparse_data_values] = + container->get_sparse_data_paths_and_values(); + StringRef prefix_ref(path.get_path()); + std::string_view path_prefix(prefix_ref.data, prefix_ref.size); + for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) { + size_t start = src_sparse_data_offsets[ssize_t(i) - 1]; + size_t end = src_sparse_data_offsets[ssize_t(i)]; + size_t lower_bound_index = + vectorized::ColumnObject::find_path_lower_bound_in_sparse_data( + prefix_ref, src_sparse_data_paths, start, end); + for (; lower_bound_index != end; ++lower_bound_index) { + auto path_ref = src_sparse_data_paths.get_data_at(lower_bound_index); + std::string_view path(path_ref.data, path_ref.size); + if (!path.starts_with(path_prefix)) { + break; + } + // Don't include path that is equal to the prefix. + if (path.size() != path_prefix.size()) { + auto sub_path = get_sub_path(path, path_prefix); + sparse_data_paths->insert_data(sub_path.data(), sub_path.size()); + sparse_data_values->insert_from(src_sparse_data_values, + lower_bound_index); + } else { + // insert into root column, example: access v['b'] and b is in sparse column + // data example: + // {"b" : 123} + // {"b" : {"c" : 456}} + // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish + // from "" which is empty path and root + const auto& data = ColumnObject::deserialize_from_sparse_column( + &src_sparse_data_values, lower_bound_index); + root.insert(data.first, data.second); + } + } + if (root.size() == sparse_data_offsets.size()) { + root.insert_default(); + } + sparse_data_offsets.push_back(sparse_data_paths->size()); + } + container->get_subcolumns().create_root(root); + container->set_num_rows(mutable_ptr->size()); + }; + if (node != nullptr) { - result_col = ColumnObject::create(true); std::vector<decltype(node)> nodes; PathsInData paths; ColumnObject::Subcolumns::get_leaves_of_node(node, nodes, paths); - ColumnObject::Subcolumns new_subcolumns; for (const auto* n : nodes) { PathInData new_path = n->path.copy_pop_front(); VLOG_DEBUG << "add node " << new_path.get_path() @@ -178,19 +245,28 @@ private: VLOG_DEBUG << "failed to add node " << new_path.get_path(); } } + // handle the root node if (new_subcolumns.empty() && !nodes.empty()) { CHECK_EQ(nodes.size(), 1); new_subcolumns.create_root(ColumnObject::Subcolumn { nodes[0]->data.get_finalized_column_ptr()->assume_mutable(), nodes[0]->data.get_least_common_type(), true, true}); + auto container = ColumnObject::create(src.max_subcolumns_count(), + std::move(new_subcolumns)); + result_col->insert_range_from(*container, 0, container->size()); + } else { + auto container = ColumnObject::create(src.max_subcolumns_count(), + std::move(new_subcolumns)); + container->clear_sparse_column(); + extract_from_sparse_column(container); + result_col->insert_range_from(*container, 0, container->size()); } - auto container = ColumnObject::create(std::move(new_subcolumns)); - result_col->insert_range_from(*container, 0, container->size()); } else { - // Create with root, otherwise the root type maybe type Nothing ? - result_col = ColumnObject::create(true); - result_col->insert_many_defaults(src.size()); + auto container = + ColumnObject::create(src.max_subcolumns_count(), std::move(new_subcolumns)); + extract_from_sparse_column(container); + result_col->insert_range_from(*container, 0, container->size()); } *result = result_col->get_ptr(); (*result)->assume_mutable()->finalize(); diff --git a/be/test/vec/columns/column_object_test.cpp b/be/test/vec/columns/column_object_test.cpp index e59219827db..a6b68f6b972 100644 --- a/be/test/vec/columns/column_object_test.cpp +++ b/be/test/vec/columns/column_object_test.cpp @@ -45,7 +45,7 @@ doris::vectorized::Field construct_variant_map( auto construct_basic_varint_column() { // 1. create an empty variant column - auto variant = ColumnObject::create(); + auto variant = ColumnObject::create(5); std::vector<std::pair<std::string, doris::vectorized::Field>> data; @@ -85,7 +85,7 @@ auto construct_dst_varint_column() { vectorized::ColumnObject::Subcolumn {0, true}); dynamic_subcolumns.add(vectorized::PathInData("v.c.d"), vectorized::ColumnObject::Subcolumn {0, true}); - return ColumnObject::create(std::move(dynamic_subcolumns)); + return ColumnObject::create(5, std::move(dynamic_subcolumns)); } TEST(ColumnVariantTest, basic_finalize) { @@ -331,7 +331,7 @@ doris::vectorized::Field get_jsonb_field(std::string_view type) { auto construct_advanced_varint_column() { // 1. create an empty variant column - auto variant = ColumnObject::create(); + auto variant = ColumnObject::create(5); std::vector<std::pair<std::string, doris::vectorized::Field>> data; @@ -609,7 +609,7 @@ TEST(ColumnVariantTest, advanced_insert_range_from) { auto construct_varint_column_only_subcolumns() { // 1. create an empty variant column - auto variant = ColumnObject::create(); + auto variant = ColumnObject::create(5); std::vector<std::pair<std::string, doris::vectorized::Field>> data; @@ -631,7 +631,7 @@ auto construct_varint_column_only_subcolumns() { auto construct_varint_column_more_subcolumns() { // 1. create an empty variant column - auto variant = ColumnObject::create(); + auto variant = ColumnObject::create(5); std::vector<std::pair<std::string, doris::vectorized::Field>> data; @@ -657,7 +657,7 @@ TEST(ColumnVariantTest, empty_inset_range_from) { EXPECT_EQ(src->size(), 6); // dst is an empty column - auto dst = ColumnObject::create(); + auto dst = ColumnObject::create(5); // subcolumn->subcolumn v.a v.b v.c v.f v.e dst->insert_range_from(*src, 0, 6); diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java index e9f1b50c0df..54af508fd95 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java @@ -124,6 +124,9 @@ public class ScalarType extends Type { @SerializedName(value = "lenStr") private String lenStr; + @SerializedName(value = "variantMaxSubcolumnsCount") + private int variantMaxSubcolumnsCount; + public ScalarType(PrimitiveType type) { this.type = type; } @@ -727,11 +730,17 @@ public class ScalarType extends Type { case CHAR: case HLL: case STRING: - case JSONB: - case VARIANT: { + case JSONB: { scalarType.setLen(getLength()); break; } + case VARIANT: { + scalarType.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount); + if (variantMaxSubcolumnsCount < 0) { + throw new IllegalArgumentException(String.format("error count: %d", variantMaxSubcolumnsCount)); + } + break; + } case DECIMALV2: case DECIMAL32: case DECIMAL64: @@ -913,6 +922,9 @@ public class ScalarType extends Type { if (isDatetimeV2() && scalarType.isDatetimeV2()) { return true; } + if (isVariantType() && scalarType.isVariantType()) { + return true; + } return false; } @@ -943,6 +955,9 @@ public class ScalarType extends Type { if (type.isDecimalV2Type() || type == PrimitiveType.DATETIMEV2 || type == PrimitiveType.TIMEV2) { return precision == other.precision && scale == other.scale; } + if (this.isVariantType() && other.isVariantType()) { + return this.getVariantMaxSubcolumnsCount() == other.getVariantMaxSubcolumnsCount(); + } return true; } @@ -1128,6 +1143,14 @@ public class ScalarType extends Type { return finalType; } + if (t1.isVariantType() && t2.isVariantType()) { + if (t1.getVariantMaxSubcolumnsCount() == t2.getVariantMaxSubcolumnsCount()) { + return t1; + } else { + return Type.UNSUPPORTED; + } + } + PrimitiveType smallerType = (t1.type.ordinal() < t2.type.ordinal() ? t1.type : t2.type); PrimitiveType largerType = @@ -1213,4 +1236,13 @@ public class ScalarType extends Type { result = 31 * result + scale; return result; } + + public void setVariantMaxSubcolumnsCount(int variantMaxSubcolumnsCount) { + this.variantMaxSubcolumnsCount = variantMaxSubcolumnsCount; + LOG.info("set max count is: {}", variantMaxSubcolumnsCount); + } + + public int getVariantMaxSubcolumnsCount() { + return variantMaxSubcolumnsCount; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 8d257bc2d78..bfe16022472 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -63,10 +63,12 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaContext; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -1020,6 +1022,11 @@ public class SchemaChangeHandler extends AlterHandler { lightSchemaChange = false; } + Type type = newColumn.getType(); + if (type.isVariantType()) { + ScalarType scType = (ScalarType) type; + scType.setVariantMaxSubcolumnsCount(olapTable.getVariantMaxSubcolumnsCount()); + } // check if the new column already exist in base schema. // do not support adding new column which already exist in base schema. List<Column> baseSchema = olapTable.getBaseSchema(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java index de257991ca6..cc885ca3e8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java @@ -158,6 +158,9 @@ public class CastExpr extends Expr { if (from.isComplexType() && type.isJsonbType()) { nullableMode = Function.NullableMode.ALWAYS_NULLABLE; } + if (from.isVariantType() || type.isVariantType()) { + nullableMode = Function.NullableMode.DEPEND_ON_ARGUMENT; + } Preconditions.checkState(nullableMode != null, "cannot find nullable node for cast from " + from + " to " + to); fn = new Function(new FunctionName(getFnName(type)), Lists.newArrayList(e.type), type, diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 53bb2ba95ac..63b346e0a7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -2032,6 +2032,9 @@ public class FunctionCallExpr extends Expr { } else if (children.size() == 1) { this.type = ScalarType.createDatetimeV2Type(6); } + } else if (fn.getFunctionName().getFunction().equalsIgnoreCase("element_at") + && getChild(0).type.isVariantType()) { + this.type = getChild(0).type; } else { this.type = fn.getReturnType(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java index a9e11458582..41d284cfe54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java @@ -195,6 +195,12 @@ public class MVColumnItem { result.setIsAllowNull(defineExpr.isNullable()); } } + if (result.getType().isVariantType()) { + ScalarType variantType = (ScalarType) this.getType(); + if (variantType.getVariantMaxSubcolumnsCount() != olapTable.getVariantMaxSubcolumnsCount()) { + throw new DdlException("MVColumnItem variantType is error"); + } + } result.setName(name); result.setAggregationType(aggregationType, isAggregationTypeImplicit); result.setDefineExpr(defineExpr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 916a213027f..f73e6ee73ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -610,6 +610,10 @@ public class Column implements GsonPostProcessable { tColumnType.setScale(this.getScale()); tColumnType.setIndexLen(this.getOlapColumnIndexSize()); + if (this.getType().isVariantType()) { + ScalarType variantType = (ScalarType) this.getType(); + tColumnType.setVariantMaxSubcolumnsCount(variantType.getVariantMaxSubcolumnsCount()); + } tColumn.setColumnType(tColumnType); if (null != this.aggregationType) { @@ -832,6 +836,9 @@ public class Column implements GsonPostProcessable { for (Column c : childrenColumns) { builder.addChildrenColumns(c.toPb(Sets.newHashSet(), Lists.newArrayList())); } + } else if (this.type.isVariantType()) { + ScalarType variantType = (ScalarType) this.getType(); + builder.setVariantMaxSubcolumnsCount(variantType.getVariantMaxSubcolumnsCount()); } OlapFile.ColumnPB col = builder.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 7597a7d256b..1745d2de68e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3729,6 +3729,12 @@ public class Env { sb.append(olapTable.variantEnableFlattenNested()).append("\""); } + // variant max subcolumns count + if (olapTable.getVariantMaxSubcolumnsCount() != 0) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT).append("\" = \""); + sb.append(olapTable.getVariantMaxSubcolumnsCount()).append("\""); + } + // binlog if (Config.enable_feature_binlog) { BinlogConfig binlogConfig = olapTable.getBinlogConfig(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index ec3bd2acbc5..e924e688ef8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2466,6 +2466,22 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return false; } + public void setVariantMaxSubcolumnsCount(int maxSubcoumnsCount) { + getOrCreatTableProperty().setVariantMaxSubcolumnsCount(maxSubcoumnsCount); + List<Column> columns = getBaseSchema(true); + for (Column column : columns) { + Type type = column.getType(); + if (type.isVariantType()) { + ScalarType scType = (ScalarType) type; + scType.setVariantMaxSubcolumnsCount(maxSubcoumnsCount); + } + } + } + + public int getVariantMaxSubcolumnsCount() { + return getOrCreatTableProperty().getVariantMaxSubcolumnsCount(); + } + public int getBaseSchemaVersion() { MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId); return baseIndexMeta.getSchemaVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 1ac556c6846..f08f632853c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -283,6 +283,16 @@ public class TableProperty implements Writable, GsonPostProcessable { return variantEnableFlattenNested; } + public void setVariantMaxSubcolumnsCount(int maxSubcoumnsCount) { + properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT, Integer.toString(maxSubcoumnsCount)); + } + + public int getVariantMaxSubcolumnsCount() { + return Integer.parseInt(properties.getOrDefault( + PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT, + Integer.toString(PropertyAnalyzer.VARIANT_MAX_SUBCOLUMNS_COUNT_DEFAULT_VALUE))); + } + public TableProperty buildEnableSingleReplicaCompaction() { enableSingleReplicaCompaction = Boolean.parseBoolean( properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION, "false")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 5721db0c27e..af385fb6b82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -240,6 +240,9 @@ public class PropertyAnalyzer { public static final long TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5; public static final long TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1; + public static final String PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT = "variant_max_subcolumns_count"; + public static final int VARIANT_MAX_SUBCOLUMNS_COUNT_DEFAULT_VALUE = 0; + public enum RewriteType { PUT, // always put property REPLACE, // replace if exists property @@ -1716,4 +1719,23 @@ public class PropertyAnalyzer { } return properties; } + + public static int analyzeVariantMaxSubcolumnsCount(Map<String, String> properties, int defuatValue) + throws AnalysisException { + int maxSubcoumnsCount = defuatValue; + if (properties != null && properties.containsKey(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT)) { + String maxSubcoumnsCountStr = properties.get(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT); + try { + maxSubcoumnsCount = Integer.parseInt(maxSubcoumnsCountStr); + if (maxSubcoumnsCount < 0 || maxSubcoumnsCount > 10000) { + throw new AnalysisException("varaint max counts count must between 10 and 10000 "); + } + } catch (Exception e) { + throw new AnalysisException("varaint max counts count format error"); + } + + properties.remove(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT); + } + return maxSubcoumnsCount; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 3a9e96bade6..ee612fe5702 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1634,6 +1634,10 @@ public class InternalCatalog implements CatalogIf<Database> { properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_ENABLE_FLATTEN_NESTED, olapTable.variantEnableFlattenNested().toString()); } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT)) { + properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT, + Integer.toString(olapTable.getVariantMaxSubcolumnsCount())); + } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)) { properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION, olapTable.enableSingleReplicaCompaction().toString()); @@ -3063,6 +3067,16 @@ public class InternalCatalog implements CatalogIf<Database> { } Preconditions.checkNotNull(versionInfo); + int variantMaxSubcolumnsCount = ConnectContext.get() == null ? 0 : ConnectContext.get() + .getSessionVariable().getGlobalVariantMaxSubcolumnsCount(); + try { + variantMaxSubcolumnsCount = PropertyAnalyzer + .analyzeVariantMaxSubcolumnsCount(properties, variantMaxSubcolumnsCount); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount); + // a set to record every new tablet created when create table // if failed in any step, use this set to do clear things Set<Long> tabletIdSet = new HashSet<>(); @@ -3274,6 +3288,7 @@ public class InternalCatalog implements CatalogIf<Database> { throw t; } } + return tableHasExist; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java index 414dac1c95d..7e43bdc5401 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java @@ -307,7 +307,7 @@ public class VariantSubPathPruning extends DefaultPlanRewriter<PruneContext> imp } SlotReference outputSlot = new SlotReference(StatementScopeIdGenerator.newExprId(), - entry.getValue().get(0).getName(), VariantType.INSTANCE, + entry.getValue().get(0).getName(), entry.getValue().get(0).getDataType(), true, ImmutableList.of(), null, null, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java index ea6997e8482..ba902847e92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java @@ -114,6 +114,7 @@ public interface ComputeSignature extends FunctionTrait, ImplicitCastInputTypes .then(ComputeSignatureHelper::implementFollowToArgumentReturnType) .then(ComputeSignatureHelper::normalizeDecimalV2) .then(ComputeSignatureHelper::dynamicComputePropertiesOfArray) + .then(ComputeSignatureHelper::dynamicComputeVariantArgs) .get(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java index 166f1c9db7f..662185bc8eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.types.DecimalV3Type; import org.apache.doris.nereids.types.MapType; import org.apache.doris.nereids.types.NullType; import org.apache.doris.nereids.types.StructType; +import org.apache.doris.nereids.types.VariantType; import org.apache.doris.nereids.types.coercion.AnyDataType; import org.apache.doris.nereids.types.coercion.FollowToAnyDataType; import org.apache.doris.nereids.types.coercion.FollowToArgumentType; @@ -428,6 +429,34 @@ public class ComputeSignatureHelper { return signature; } + /** dynamicComputeVariantArgs */ + public static FunctionSignature dynamicComputeVariantArgs( + FunctionSignature signature, List<Expression> arguments) { + List<DataType> newArgTypes = Lists.newArrayListWithCapacity(arguments.size()); + boolean findVariantType = false; + for (int i = 0; i < arguments.size(); i++) { + DataType sigType; + if (i >= signature.argumentsTypes.size()) { + sigType = signature.getVarArgType().orElseThrow( + () -> new AnalysisException("function arity not match with signature")); + } else { + sigType = signature.argumentsTypes.get(i); + } + DataType expressionType = arguments.get(i).getDataType(); + if (sigType instanceof VariantType && expressionType instanceof VariantType) { + newArgTypes.add(expressionType); + signature = signature.withReturnType(expressionType); + findVariantType = true; + } else { + newArgTypes.add(sigType); + } + } + if (findVariantType) { + signature = signature.withArgumentTypes(signature.hasVarArgs, newArgTypes); + } + return signature; + } + private static FunctionSignature defaultDecimalV3PrecisionPromotion( FunctionSignature signature, List<Expression> arguments) { DecimalV3Type finalType = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java index 62d7eb72e20..c0e60fdf135 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java @@ -36,7 +36,7 @@ import java.util.List; public class ExplodeVariantArray extends TableGeneratingFunction implements UnaryExpression, AlwaysNullable { public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( - FunctionSignature.ret(new VariantType()).args(new VariantType()) + FunctionSignature.ret(new VariantType(0)).args(new VariantType(0)) ); /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java index c8d54189d37..ed62fab5ad9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java @@ -45,8 +45,8 @@ public class ElementAt extends ScalarFunction public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( FunctionSignature.ret(new FollowToAnyDataType(0)) .args(ArrayType.of(new AnyDataType(0)), BigIntType.INSTANCE), - FunctionSignature.ret(new VariantType()) - .args(new VariantType(), VarcharType.SYSTEM_DEFAULT), + FunctionSignature.ret(new VariantType(0)) + .args(new VariantType(0), VarcharType.SYSTEM_DEFAULT), FunctionSignature.ret(new FollowToAnyDataType(1)) .args(MapType.of(new AnyDataType(0), new AnyDataType(1)), new FollowToAnyDataType(0)) ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java index 1d5f69c5366..b666f898135 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java @@ -351,7 +351,10 @@ public abstract class DataType { case CHAR: return CharType.createCharType(type.getLength()); case VARCHAR: return VarcharType.createVarcharType(type.getLength()); case STRING: return StringType.INSTANCE; - case VARIANT: return VariantType.INSTANCE; + case VARIANT: { + ScalarType scType = (ScalarType) type; + return new VariantType(scType.getVariantMaxSubcolumnsCount()); + } case JSONB: return JsonType.INSTANCE; case IPV4: return IPv4Type.INSTANCE; case IPV6: return IPv6Type.INSTANCE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java index 63752594998..a115c1bd25b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.types; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.nereids.annotation.Developing; import org.apache.doris.nereids.types.coercion.PrimitiveType; @@ -31,13 +32,25 @@ import java.util.Objects; @Developing public class VariantType extends PrimitiveType { - public static final VariantType INSTANCE = new VariantType(); + public static final VariantType INSTANCE = new VariantType(0); public static final int WIDTH = 24; + private int variantMaxSubcolumnsCount = 0; + + // public static createVariantType(int variantMaxSubcolumnsCount) { + // return new VariantType(variantMaxSubcolumnsCount); + // } + + public VariantType(int variantMaxSubcolumnsCount) { + this.variantMaxSubcolumnsCount = variantMaxSubcolumnsCount; + } + @Override public Type toCatalogDataType() { - return Type.VARIANT; + ScalarType type = ScalarType.createVariantType(); + type.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount); + return type; } @Override @@ -58,7 +71,8 @@ public class VariantType extends PrimitiveType { if (o == null || getClass() != o.getClass()) { return false; } - return super.equals(o); + VariantType other = (VariantType) o; + return this.variantMaxSubcolumnsCount == other.variantMaxSubcolumnsCount; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2c09ee73f57..7640ce12830 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -693,6 +693,8 @@ public class SessionVariable implements Serializable, Writable { */ public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = "enable_auto_create_when_overwrite"; + public static final String GLOBAL_VARIANT_SUBCOLUMNS_COUNT = "global_variant_max_subcolumns_count"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2323,6 +2325,14 @@ public class SessionVariable implements Serializable, Writable { }) public boolean skipCheckingAcidVersionFile = false; + @VariableMgr.VarAttr( + name = GLOBAL_VARIANT_SUBCOLUMNS_COUNT, + needForward = true, + checker = "checkGlobalVariantMaxSubcolumnsCount", + fuzzy = true + ) + public int globalVariantMaxSubcolumnsCount = 5; + public void setEnableEsParallelScroll(boolean enableESParallelScroll) { this.enableESParallelScroll = enableESParallelScroll; } @@ -2362,6 +2372,7 @@ public class SessionVariable implements Serializable, Writable { this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); // this.enableHashJoinEarlyStartProbe = random.nextBoolean(); this.enableParallelResultSink = random.nextBoolean(); + this.globalVariantMaxSubcolumnsCount = random.nextInt(10); int randomInt = random.nextInt(4); if (randomInt % 2 == 0) { this.rewriteOrToInPredicateThreshold = 100000; @@ -3674,6 +3685,14 @@ public class SessionVariable implements Serializable, Writable { } } + public void checkGlobalVariantMaxSubcolumnsCount(String variantMaxSubcolumnsCount) { + int value = Integer.valueOf(variantMaxSubcolumnsCount); + if (value < 0 || value > 10000) { + throw new UnsupportedOperationException( + "variant max subcolumns count is: " + variantMaxSubcolumnsCount + "it must between 0 and 10000"); + } + } + public void checkQueryTimeoutValid(String newQueryTimeout) { int value = Integer.valueOf(newQueryTimeout); if (value <= 0) { @@ -4570,4 +4589,8 @@ public class SessionVariable implements Serializable, Writable { public boolean getDisableInvertedIndexV1ForVaraint() { return disableInvertedIndexV1ForVaraint; } + + public int getGlobalVariantMaxSubcolumnsCount() { + return globalVariantMaxSubcolumnsCount; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index a9c1612eb48..e706951e783 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -45,7 +45,6 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; -import org.apache.doris.catalog.VariantType; import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; @@ -718,7 +717,7 @@ public class StatisticsUtil { return type instanceof ArrayType || type instanceof StructType || type instanceof MapType - || type instanceof VariantType + || type.isVariantType() || type instanceof AggStateType; } diff --git a/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java b/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java index 2e6ace105b1..4f81182837a 100644 --- a/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java +++ b/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java @@ -324,6 +324,8 @@ public class TrinoLogicalPlanBuilder extends io.trino.sql.tree.AstVisitor<Object } else if (dataType instanceof io.trino.sql.tree.DateTimeDataType) { // TODO: support date data type mapping throw new DialectTransformException("transform date data type"); + } else if("variant".eqluals(typeName)) { + throw new DialectTransformException("transform variant data type"); } throw new AnalysisException("Nereids do not support type: " + dataType); } diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index 9b3824db3dc..26c0c9f0c31 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -63,6 +63,7 @@ message PColumnMeta { optional bool result_is_nullable = 6; optional string function_name = 7; optional int32 be_exec_version = 8; + optional int32 variant_max_subcolumns_count = 9 [default = 0]; } message PBlock { diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 259f9f2861a..b57cf47c2c6 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -321,6 +321,7 @@ message ColumnPB { // only reference by variant sparse columns optional int32 parent_unique_id = 23; optional int32 be_exec_version = 24; + optional int32 variant_max_subcolumns_count = 25 [default = 0]; } // Dictionary of Schema info, to reduce TabletSchemaCloudPB fdb kv size diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index dee4a81d3bb..c51982a8dab 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -204,6 +204,7 @@ message ColumnMetaPB { optional string function_name = 19; // used on agg_state type optional int32 be_exec_version = 20; // used on agg_state type optional VariantStatisticsPB variant_statistics = 21; // only used in variant type + optional int32 variant_max_subcolumns_count = 22 [default = 0]; } message PrimaryKeyIndexMetaPB { diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index 012434dc3bc..c6beb626e96 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -53,6 +53,9 @@ message PTypeNode { optional bool contains_null = 4; // update for map/struct type repeated bool contains_nulls = 5; + + // only used for VARIANT + optional int32 variant_max_subcolumns_count = 6 [default = 0]; }; // A flattened representation of a tree of column types obtained by depth-first diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 235c1cb2837..80347dff13c 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -105,7 +105,7 @@ enum TTypeNodeType { ARRAY, MAP, STRUCT, - VARIANT, + VARIANT, // unused } enum TStorageBackendType { @@ -136,6 +136,9 @@ struct TScalarType { // Only set for DECIMAL 3: optional i32 precision 4: optional i32 scale + + // Only set for VARIANT + 5: optional i32 variant_max_subcolumns_count = 0; } // Represents a field in a STRUCT type. @@ -279,6 +282,7 @@ struct TColumnType { 3: optional i32 index_len 4: optional i32 precision 5: optional i32 scale + 6: optional i32 variant_max_subcolumns_count = 0; } // A TNetworkAddress is the standard host, port representation of a diff --git a/regression-test/data/variant_p0/delete_update.out b/regression-test/data/variant_p0/delete_update.out index 2b014bc0674..574464e9e86 100644 Binary files a/regression-test/data/variant_p0/delete_update.out and b/regression-test/data/variant_p0/delete_update.out differ diff --git a/regression-test/data/variant_p0/test_sub_path_pruning.out b/regression-test/data/variant_p0/test_sub_path_pruning.out index ae75160a91d..54570025f52 100644 Binary files a/regression-test/data/variant_p0/test_sub_path_pruning.out and b/regression-test/data/variant_p0/test_sub_path_pruning.out differ diff --git a/regression-test/data/variant_p0/update/load.out b/regression-test/data/variant_p0/update/load.out new file mode 100644 index 00000000000..c1c66031e7c Binary files /dev/null and b/regression-test/data/variant_p0/update/load.out differ diff --git a/regression-test/data/variant_p0/update/query.out b/regression-test/data/variant_p0/update/query.out new file mode 100644 index 00000000000..ea7dff7c6fb Binary files /dev/null and b/regression-test/data/variant_p0/update/query.out differ diff --git a/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out b/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out index 0adc70c83aa..ed982672fa5 100644 Binary files a/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out and b/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out differ diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index 3d905d07e05..67140a1d6da 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -28,15 +28,15 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 3 - properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "false", "variant_enable_flatten_nested" = "true"); + properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "false", "variant_enable_flatten_nested" = "false"); """ // test mor table - sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0, "d" : [{"x" : 1}]}')""" - sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0, "d" : [{"y" : 1}]}')""" - sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0, "d" : [{"o" : 1}]}')""" - sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0, "d" : [{"p" : 1}]}')""" - sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0, "d" : [{"q" : 1}]}')""" + sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.1, "d" : [{"x" : 1}]}')""" + sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.1, "d" : [{"y" : 1}]}')""" + sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.1, "d" : [{"o" : 1}]}')""" + sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.1, "d" : [{"p" : 1}]}')""" + sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.1, "d" : [{"q" : 1}]}')""" sql "delete from ${table_name} where k = 1" sql """update ${table_name} set v = '{"updated_value":123}' where k = 2""" diff --git a/regression-test/suites/variant_p0/element_function.groovy b/regression-test/suites/variant_p0/element_function.groovy index 7b5e55ea53b..51555508c33 100644 --- a/regression-test/suites/variant_p0/element_function.groovy +++ b/regression-test/suites/variant_p0/element_function.groovy @@ -16,7 +16,8 @@ // under the License. suite("regression_test_variant_element_at", "p0") { - sql """ + sql """ DROP TABLE IF EXISTS element_fn_test """ + sql """ CREATE TABLE IF NOT EXISTS element_fn_test( k bigint, v variant, diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index cd5e9ee523d..4b61f46e696 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -323,6 +323,7 @@ suite("regression_test_variant", "p0"){ // test mow with delete table_name = "variant_mow" + sql """ DROP TABLE IF EXISTS ${table_name} """ sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, diff --git a/regression-test/suites/variant_p0/select_partition.groovy b/regression-test/suites/variant_p0/select_partition.groovy index a057e3b9a1d..c5e30aebc7d 100644 --- a/regression-test/suites/variant_p0/select_partition.groovy +++ b/regression-test/suites/variant_p0/select_partition.groovy @@ -60,6 +60,7 @@ suite("query_on_specific_partition") { qt_sql """select * from t_p temporary partition tp1;""" + sql """ DROP TABLE IF EXISTS test_iot """ sql """ CREATE TABLE IF NOT EXISTS test_iot ( `test_int` int NOT NULL, diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy index 1210c57e3bc..77dd439f1bf 100644 --- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy +++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy @@ -26,7 +26,7 @@ suite("variant_sub_path_pruning", "variant_type"){ ) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) - PROPERTIES("replication_num"="1") + PROPERTIES("replication_num"="1", "variant_max_subcolumns_count" = "0") """ sql """ @@ -156,19 +156,19 @@ suite("variant_sub_path_pruning", "variant_type"){ order_qt_sql """select dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test;""" order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test union all select dt as c1 from pruning_test union all select dt as c1 from pruning_test) v1;""" order_qt_sql """select c1['b'] from (select dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test) v1;""" - order_qt_sql """select c1['d'] from (select dt['a'] as c1 from pruning_test union all select dt['b'] as c1 from pruning_test union all select dt['c'] as c1 from pruning_test) v1;""" + // order_qt_sql """select c1['d'] from (select dt['a'] as c1 from pruning_test union all select dt['b'] as c1 from pruning_test union all select dt['c'] as c1 from pruning_test) v1;""" order_qt_sql """select c1['d'] from (select dt['a'] as c1 from pruning_test union all select dt['b'] as c1 from pruning_test union all select dt['b'] as c1 from pruning_test) v1;""" order_qt_sql """select c1['c']['d'] from (select dt['a']['b'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test union all select dt as c1 from pruning_test) v1;""" // one table + one const list order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100;""" order_qt_sql """select c1['a'] from (select id, c1 from (select cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" + order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as variant) as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" + order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;""" order_qt_sql """select id, cast(c1['c'] as text) from (select cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by 1, 2 limit 100;""" order_qt_sql """select c1['c'] from (select id, c1 from (select cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" - // order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp order by 1;""" - // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" + order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp order by 1;""" + order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;""" // two const list order_qt_sql """select id, cast(c1['a'] as text) from (select cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as variant) as c1, 2 as id) tmp order by id limit 100;""" diff --git a/regression-test/suites/variant_p0/update/load.groovy b/regression-test/suites/variant_p0/update/load.groovy new file mode 100644 index 00000000000..a857a912da3 --- /dev/null +++ b/regression-test/suites/variant_p0/update/load.groovy @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("update_test_load", "p0") { + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def table_name = "test_update" + + sql "DROP TABLE IF EXISTS ${table_name}" + sql "DROP MATERIALIZED VIEW IF EXISTS regression_test_variant_p0_update.table_mv2;" + sql "DROP MATERIALIZED VIEW IF EXISTS regression_test_variant_p0_update.table_mv4;" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 6 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + for (int i = 0; i < 10; i++) { + load_json_data.call(table_name, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + + qt_sql """ select count() from ${table_name} """ + + createMV ("create materialized view table_mv1 as select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name};") + + explain { + sql("select max(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + explain { + sql("select min(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + explain { + sql("select count(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + + qt_sql """ select max(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + qt_sql """ select min(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + qt_sql """ select count(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + + sql """ + CREATE MATERIALIZED VIEW table_mv2 BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 6 PROPERTIES +('replication_num' = '1') AS SELECT cast(v['type'] as text), cast(v['public'] as int) FROM ${table_name}; + """ + waitingMTMVTaskFinishedByMvName("table_mv2") + + explain { + sql("SELECT sum(cast(v['public'] as int)) FROM ${table_name} group by cast(v['type'] as text) order by cast(v['type'] as text);") + contains("table_mv2 chose") + } + + qt_sql """ SELECT sum(cast(v['public'] as int)) FROM ${table_name} group by cast(v['type'] as text) order by cast(v['type'] as text); """ + + + def create_table_load_data = {create_table_name-> + sql "DROP TABLE IF EXISTS ${create_table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${create_table_name} ( + k bigint, + v variant NOT NULL + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 6 + properties("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + for (int i = 0; i < 10; i++) { + load_json_data.call(create_table_name, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + } + + create_table_load_data.call("test_update_sc") + create_table_load_data.call("test_update_compact") + create_table_load_data.call("test_update_sc2") +} diff --git a/regression-test/suites/variant_p0/update/query.groovy b/regression-test/suites/variant_p0/update/query.groovy new file mode 100644 index 00000000000..efc3b880548 --- /dev/null +++ b/regression-test/suites/variant_p0/update/query.groovy @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("update_test_query", "p0") { + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def table_name = "test_update" + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def compaction = {compact_table_name -> + + def tablets = sql_return_maparray """ show tablets from ${compact_table_name}; """ + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + // wait for all compactions done + for (def tablet in tablets) { + Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(() -> { + Thread.sleep(10000) + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("compaction task for this tablet is not running", compactionStatus.msg.toLowerCase()) + }); + } + + + for (def tablet in tablets) { + int afterSegmentCount = 0 + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + logger.info("rowset is: " + rowset) + afterSegmentCount += Integer.parseInt(rowset.split(" ")[1]) + } + assertEquals(afterSegmentCount, 1) + } + } + + for (int i = 0; i < 10; i++) { + load_json_data.call(table_name, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + + def normal_check = { + qt_sql """ select count() from ${table_name} """ + qt_sql """ select v['actor'] from ${table_name} order by k limit 1""" + qt_sql """ select count(cast (v['repo']['url'] as text)) from ${table_name} group by cast (v['type'] as text) order by cast (v['type'] as text)""" + qt_sql """ select max(cast (v['public'] as tinyint)) from ${table_name}""" + } + + def dbName = "regression_test_variant_p0_update" + + // mv1, mv2 + def mv_check = { + sql 'REFRESH MATERIALIZED VIEW table_mv2 AUTO' + waitingMTMVTaskFinished(getJobName(dbName, 'table_mv2')) + explain { + sql("select max(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + explain { + sql("select min(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + explain { + sql("select count(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv;") + contains("table_mv1 chose") + } + + explain { + sql("SELECT sum(cast(v['public'] as int)) FROM ${table_name} group by cast(v['type'] as text) order by cast(v['type'] as text);") + contains("table_mv2 chose") + } + + qt_sql """ select max(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + qt_sql """ select min(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + qt_sql """ select count(kk) from (select (abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from ${table_name}) as mv; """ + qt_sql """ SELECT sum(cast(v['public'] as int)) FROM ${table_name} group by cast(v['type'] as text) order by cast(v['type'] as text); """ + } + + // mv3, mv4 + def mv_check2 = { + sql 'REFRESH MATERIALIZED VIEW table_mv4 AUTO' + waitingMTMVTaskFinished(getJobName(dbName, 'table_mv4')) + explain { + sql("select max(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2;") + contains("table_mv3 chose") + } + explain { + sql("select min(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2;") + contains("table_mv3 chose") + } + explain { + sql("select count(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2;") + contains("table_mv3 chose") + } + explain { + sql("SELECT cast(v['payload']['before'] as text) FROM ${table_name} order by cast(v['actor']['id'] as int) limit 1; ") + contains("table_mv4 chose") + } + qt_sql """ select max(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2; """ + qt_sql """ select min(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2; """ + qt_sql """ select count(element) from (select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name}) as mv2; """ + qt_sql """ SELECT cast(v['payload']['before'] as text) FROM ${table_name} order by cast(v['actor']['id'] as int) limit 1; """ + } + + createMV ("create materialized view table_mv3 as select (abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from ${table_name};") + + sql """ + CREATE MATERIALIZED VIEW table_mv4 BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 1 PROPERTIES +('replication_num' = '1') AS SELECT cast(v['payload']['before'] as text), cast(v['actor']['id'] as int) FROM ${table_name}; + """ + waitingMTMVTaskFinishedByMvName("table_mv4") + + normal_check.call() + mv_check.call() + mv_check2.call() + + compaction.call(table_name) + + normal_check.call() + mv_check.call() + mv_check2.call() + + def table_name_sc = "test_update_sc" + + for (int i = 0; i < 10; i++) { + load_json_data.call(table_name_sc, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + + def schema_change = {schema_change_table_name -> + def tablets = sql_return_maparray """ show tablets from ${schema_change_table_name}; """ + Set<String> rowsetids = new HashSet<>(); + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + int segmentCount = Integer.parseInt(rowset.split(" ")[1]) + if (segmentCount == 0) { + continue; + } + String rowsetid = rowset.split(" ")[4]; + rowsetids.add(rowsetid) + logger.info("rowsetid: " + rowsetid) + } + } + sql """ alter table ${schema_change_table_name} modify column v variant null""" + Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(() -> { + Thread.sleep(10000) + tablets = sql_return_maparray """ show tablets from ${schema_change_table_name}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + int segmentCount = Integer.parseInt(rowset.split(" ")[1]) + if (segmentCount == 0) { + continue; + } + String rowsetid = rowset.split(" ")[4]; + logger.info("rowsetid: " + rowsetid) + assertTrue(!rowsetids.contains(rowsetid)) + } + } + }); + } + + def sql_check = { check_table_name -> + qt_sql """ select count() from ${check_table_name} """ + qt_sql """ select v['actor'] from ${check_table_name} order by k limit 1""" + qt_sql """ select count(cast (v['repo']['url'] as text)) from ${check_table_name} group by cast (v['type'] as text) order by cast (v['type'] as text) """ + qt_sql """ select max(cast (v['public'] as tinyint)) from ${check_table_name}""" + } + + sql_check.call(table_name_sc) + schema_change.call(table_name_sc) + sql_check.call(table_name_sc) + + def table_name_compact = "test_update_compact" + + sql_check.call(table_name_compact) + compaction.call(table_name_compact) + sql_check.call(table_name_compact) + + def table_name_sc2 = "test_update_sc2" + + sql_check.call(table_name_sc2) + schema_change.call(table_name_sc2) + sql_check.call(table_name_sc2) +} diff --git a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy index 5d753b97382..2051d819dd8 100644 --- a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy +++ b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy @@ -47,7 +47,6 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") { try { set_be_config.call("write_buffer_size", "10240") - set_be_config.call("variant_max_subcolumns_count", "2") sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -59,7 +58,8 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") { DISTRIBUTED BY HASH(`k`) BUCKETS 1 PROPERTIES ( "replication_num" = "1", - "disable_auto_compaction" = "true" + "disable_auto_compaction" = "true", + "variant_max_subcolumns_count" = "3" ); """ @@ -71,7 +71,7 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") { for (def tablet in tablets) { String tablet_id = tablet.TabletId backend_id = tablet.BackendId - (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) @@ -163,7 +163,7 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") { qt_select_6_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42005;""" qt_select_all_bfcompact """SELECT k, v['a'], v['b'], v['xxxx'], v['point'], v['ddddd'] from ${tableName} where (cast(v['point'] as int) = 1);""" - GetDebugPoint().enableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats", [stats: "24588,12292,12291,3",subcolumns:"a,b"]) + GetDebugPoint().enableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats", [stats: "24588,12292,12291,3",subcolumns:"a,b,xxxx"]) triger_compaction.call() /** variant_statistics { diff --git a/regression-test/suites/variant_p2/load.groovy b/regression-test/suites/variant_p2/load.groovy index a737ef943bb..b193b63e927 100644 --- a/regression-test/suites/variant_p2/load.groovy +++ b/regression-test/suites/variant_p2/load.groovy @@ -65,7 +65,7 @@ suite("load_p2", "variant_type,p2"){ ) DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS ${buckets} - properties("replication_num" = "1", "disable_auto_compaction" = "false"); + properties("replication_num" = "1", "disable_auto_compaction" = "false", "variant_max_subcolumns_count" = "500"); """ } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org