eldenmoon commented on code in PR #16335: URL: https://github.com/apache/doris/pull/16335#discussion_r1098399271
########## fe/fe-core/src/main/cup/sql_parser.cup: ########## @@ -1815,7 +1828,20 @@ create_stmt ::= opt_ext_properties:extProperties {: RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition, - distribution, tblProperties, extProperties, tableComment, index); + distribution, tblProperties, extProperties, tableComment, index, false); + :} + | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name + LPAREN column_definition_list:columns COMMA index_definition_list:indexes COMMA DOTDOTDOT RPAREN opt_engine:engineName Review Comment: but it's a little bit difficult to get the bool value from column_definition_list, it's simple to just add a DOTDOTDOT token after column_definition_list ########## gensrc/proto/types.proto: ########## @@ -104,6 +104,7 @@ message PGenericType { FIXEDLENGTHOBJECT = 30; JSONB = 31; DECIMAL128I = 32; + VARIANT = 33; Review Comment: indent ########## fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java: ########## @@ -678,7 +678,7 @@ public class SessionVariable implements Serializable, Writable { // 1. read related rowids along with necessary column data // 2. spawn fetch RPC to other nodes to get related data by sorted rowids @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT) - public boolean enableTwoPhaseReadOpt = true; + public boolean enableTwoPhaseReadOpt = false; Review Comment: ? ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); Review Comment: let the backend retry could be ok ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { + // ignore column with same name + boolean hasSameNameColumn = false; + for (Column column : olapTable.getBaseSchema()) { + if (column.getName().equals(tColumnDef.getColumnDesc().getColumnName())) { + hasSameNameColumn = true; + } + } + // ignore this column + if (hasSameNameColumn) { + continue; Review Comment: the original will be casted to the existed ########## fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java: ########## @@ -169,6 +170,23 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { } } + if (destTable.isDynamicSchema()) { Review Comment: done ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java: ########## @@ -143,6 +144,21 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis } } + if (table.isDynamicSchema()) { Review Comment: for s3 load etc.. ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { + // ignore column with same name + boolean hasSameNameColumn = false; + for (Column column : olapTable.getBaseSchema()) { + if (column.getName().equals(tColumnDef.getColumnDesc().getColumnName())) { + hasSameNameColumn = true; + } + } + // ignore this column + if (hasSameNameColumn) { + continue; + } + } + String comment = tColumnDef.getComment(); + if (comment == null || comment.length() == 0) { + Instant ins = Instant.ofEpochSecond(System.currentTimeMillis() / 1000); + ZonedDateTime zdt = ins.atZone(ZoneId.systemDefault()); + comment = "auto change " + zdt.toString(); + } + + TColumnDesc tColumnDesc = tColumnDef.getColumnDesc(); + ColumnDef columnDef = initColumnfromThrift(tColumnDesc, comment); + columnDefs.add(columnDef); + } + + if (!queryMode && !columnDefs.isEmpty()) { + //3.create AddColumnsClause Review Comment: done ########## gensrc/thrift/Types.thrift: ########## @@ -92,13 +92,16 @@ enum TPrimitiveType { DECIMAL128I, JSONB, UNSUPPORTED + DECIMAL128, Review Comment: removed ########## fe/fe-core/src/main/java/org/apache/doris/load/Load.java: ########## @@ -720,20 +720,46 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs, slotDesc.setIsNullable(tblColumn.isAllowNull()); } } else { - // columns default be varchar type - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); - // ISSUE A: src slot should be nullable even if the column is not nullable. - // because src slot is what we read from file, not represent to real column value. - // If column is not nullable, error will be thrown when filling the dest slot, - // which is not nullable. - slotDesc.setIsNullable(true); + if (formatType == TFileFormatType.FORMAT_JSON + && tbl instanceof OlapTable && ((OlapTable) tbl).isDynamicSchema()) { + slotDesc.setType(tblColumn.getType()); + slotDesc.setColumn(new Column(realColName, tblColumn.getType())); + slotDesc.setIsNullable(tblColumn.isAllowNull()); + } else { + // columns default be varchar type + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); + // ISSUE A: src slot should be nullable even if the column is not nullable. + // because src slot is what we read from file, not represent to real column value. + // If column is not nullable, error will be thrown when filling the dest slot, + // which is not nullable. + slotDesc.setIsNullable(true); + } } slotDesc.setIsMaterialized(true); srcSlotIds.add(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } } + + // add a implict container column "__dynamic__" for dynamic columns Review Comment: done ########## be/src/olap/memtable.cpp: ########## @@ -148,7 +148,11 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); - auto target_block = input_block->copy_block(_column_offset); + vectorized::Block target_block = *input_block; + // maybe rollup tablet, dynamic table's tablet need full columns + if (!_tablet_schema->is_dynamic_schema()) { Review Comment: done ########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -428,7 +426,16 @@ Status VNodeChannel::add_block(vectorized::Block* block, std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - block->append_block_by_selector(_cur_mutable_block->mutable_columns(), *(payload.first)); + if (UNLIKELY(!_cur_mutable_block)) { + _cur_mutable_block.reset(new vectorized::MutableBlock(block->clone_empty())); Review Comment: yes ########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -225,8 +225,6 @@ Status VNodeChannel::init(RuntimeState* state) { _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); Review Comment: need handle dynamic block ########## fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java: ########## @@ -70,6 +70,8 @@ public class TupleDescriptor { private float avgSerializedSize; // in bytes; includes serialization overhead + private int tableId = -1; Review Comment: schema rpc need tableId to identify a specific table ########## be/src/exec/base_scanner.cpp: ########## @@ -250,6 +265,28 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } + // handle dynamic generated columns Review Comment: no new load scan will use vfile_scanner ########## be/src/vec/columns/columns_common.h: ########## @@ -51,4 +51,45 @@ void filter_arrays_impl_only_data(const PaddedPODArray<T>& src_elems, PaddedPODArray<T>& res_elems, const IColumn::Filter& filt, ssize_t result_size_hint); +namespace detail { +template <typename T> +const PaddedPODArray<T>* get_indexes_data(const IColumn& indexes); +} + +/// Check limit <= indexes->size() and call column.index_impl(const PaddedPodArray<Type> & indexes, UInt64 limit). +template <typename Column> +ColumnPtr select_index_impl(const Column& column, const IColumn& indexes, size_t limit) { + if (limit == 0) { + limit = indexes.size(); + } + + if (indexes.size() < limit) { + LOG(FATAL) << "Size of indexes is less than required."; + } + + if (auto* data_uint8 = detail::get_indexes_data<UInt8>(indexes)) { + return column.template index_impl<UInt8>(*data_uint8, limit); + } else if (auto* data_uint16 = detail::get_indexes_data<UInt16>(indexes)) { + return column.template index_impl<UInt16>(*data_uint16, limit); + } else if (auto* data_uint32 = detail::get_indexes_data<UInt32>(indexes)) { + return column.template index_impl<UInt32>(*data_uint32, limit); + } else if (auto* data_uint64 = detail::get_indexes_data<UInt64>(indexes)) { Review Comment: no index column could only be usigned integers ########## be/src/runtime/primitive_type.cpp: ########## @@ -526,6 +526,39 @@ TTypeDesc gen_type_desc(const TPrimitiveType::type val, const std::string& name) return type_desc; } +PrimitiveType get_primitive_type(vectorized::TypeIndex v_type) { + switch (v_type) { + case vectorized::TypeIndex::Int8: + return PrimitiveType::TYPE_TINYINT; + case vectorized::TypeIndex::Int16: + return PrimitiveType::TYPE_SMALLINT; + case vectorized::TypeIndex::Int32: + return PrimitiveType::TYPE_INT; + case vectorized::TypeIndex::Int64: + return PrimitiveType::TYPE_BIGINT; + case vectorized::TypeIndex::Float32: + return PrimitiveType::TYPE_FLOAT; + case vectorized::TypeIndex::Float64: + return PrimitiveType::TYPE_DOUBLE; + case vectorized::TypeIndex::Decimal32: + return PrimitiveType::TYPE_DECIMALV2; + case vectorized::TypeIndex::Array: + return PrimitiveType::TYPE_ARRAY; + case vectorized::TypeIndex::String: + return PrimitiveType::TYPE_STRING; + case vectorized::TypeIndex::Date: + return PrimitiveType::TYPE_DATE; + case vectorized::TypeIndex::DateTime: + return PrimitiveType::TYPE_DATETIME; + case vectorized::TypeIndex::Tuple: + return PrimitiveType::TYPE_STRUCT; + // TODO add vectorized::more types Review Comment: done ########## be/src/vec/io/var_int.h: ########## @@ -19,6 +19,7 @@ #include <iostream> +#include "vec/common/string_buffer.hpp" Review Comment: just make compiler happy ########## be/src/runtime/types.h: ########## @@ -201,6 +202,8 @@ struct TypeDescriptor { bool is_bitmap_type() const { return type == TYPE_OBJECT; } + bool is_variant() const { return type == TYPE_VARIANT; } Review Comment: done ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { Review Comment: ok ########## be/src/vec/data_types/get_least_supertype.h: ########## @@ -30,6 +33,13 @@ namespace doris::vectorized { * Examples: least common supertype for UInt8, Int8 - Int16. * Examples: there is no least common supertype for Array(UInt8), Int8. */ -DataTypePtr get_least_supertype(const DataTypes& types); Review Comment: I think there is no need for backward compatability? ########## be/src/olap/rowset/rowset_meta.h: ########## @@ -367,7 +367,6 @@ class RowsetMeta { int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); } void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) { - DCHECK(_schema == nullptr); Review Comment: set_tablet_schema could override original _schema ########## be/src/vec/core/block.cpp: ########## @@ -678,11 +682,14 @@ Block Block::copy_block(const std::vector<int>& column_offset) const { return columns_with_type_and_name; } -void Block::append_block_by_selector(MutableColumns& columns, - const IColumn::Selector& selector) const { - DCHECK(data.size() == columns.size()); +void Block::append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const { + if (dst->get_block_type() == BlockType::DYNAMIC) { Review Comment: done ########## gensrc/thrift/Types.thrift: ########## @@ -92,13 +92,16 @@ enum TPrimitiveType { DECIMAL128I, JSONB, UNSUPPORTED + DECIMAL128, + VARIANT, } enum TTypeNodeType { SCALAR, ARRAY, MAP, - STRUCT + STRUCT, + VARIANT = 100, Review Comment: strange ########## be/src/vec/functions/if.cpp: ########## @@ -122,7 +122,10 @@ class FunctionIf : public IFunction { } DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { - return get_least_supertype({arguments[1], arguments[2]}); + DataTypePtr type = nullptr; + get_least_supertype(DataTypes {arguments[1], arguments[2]}, &type); Review Comment: type == null means encounter conflict ########## be/src/vec/common/schema_util.cpp: ########## @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <vec/columns/column_array.h> +#include <vec/columns/column_object.h> +#include <vec/common/schema_util.h> +#include <vec/core/field.h> +#include <vec/data_types/data_type_array.h> +#include <vec/data_types/data_type_object.h> +#include <vec/functions/simple_function_factory.h> +#include <vec/json/parse2column.h> + +#include <vec/data_types/data_type_factory.hpp> + +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "olap/rowset/rowset_writer_context.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "util/thrift_rpc_helper.h" + +namespace doris::vectorized::schema_util { + +size_t get_number_of_dimensions(const IDataType& type) { + if (const auto* type_array = typeid_cast<const DataTypeArray*>(&type)) { + return type_array->get_number_of_dimensions(); + } + return 0; +} +size_t get_number_of_dimensions(const IColumn& column) { + if (const auto* column_array = check_and_get_column<ColumnArray>(column)) { + return column_array->get_number_of_dimensions(); + } + return 0; +} + +DataTypePtr get_base_type_of_array(const DataTypePtr& type) { + /// Get raw pointers to avoid extra copying of type pointers. + const DataTypeArray* last_array = nullptr; + const auto* current_type = type.get(); + while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) { + current_type = type_array->get_nested_type().get(); + last_array = type_array; + } + return last_array ? last_array->get_nested_type() : type; +} + +Array create_empty_array_field(size_t num_dimensions) { + DCHECK(num_dimensions > 0); + Array array; + Array* current_array = &array; + for (size_t i = 1; i < num_dimensions; ++i) { + current_array->push_back(Array()); + current_array = ¤t_array->back().get<Array&>(); + } + return array; +} + +FieldType get_field_type(const IDataType* data_type) { + switch (data_type->get_type_id()) { + case TypeIndex::UInt8: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_TINYINT; + case TypeIndex::UInt16: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_SMALLINT; + case TypeIndex::UInt32: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT; + case TypeIndex::UInt64: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT; + case TypeIndex::Int8: + return FieldType::OLAP_FIELD_TYPE_TINYINT; + case TypeIndex::Int16: + return FieldType::OLAP_FIELD_TYPE_SMALLINT; + case TypeIndex::Int32: + return FieldType::OLAP_FIELD_TYPE_INT; + case TypeIndex::Int64: + return FieldType::OLAP_FIELD_TYPE_BIGINT; + case TypeIndex::Float32: + return FieldType::OLAP_FIELD_TYPE_FLOAT; + case TypeIndex::Float64: + return FieldType::OLAP_FIELD_TYPE_DOUBLE; + case TypeIndex::Decimal32: + return FieldType::OLAP_FIELD_TYPE_DECIMAL; + case TypeIndex::Array: + return FieldType::OLAP_FIELD_TYPE_ARRAY; + case TypeIndex::String: + return FieldType::OLAP_FIELD_TYPE_STRING; + case TypeIndex::Date: + return FieldType::OLAP_FIELD_TYPE_DATE; + case TypeIndex::DateTime: + return FieldType::OLAP_FIELD_TYPE_DATETIME; + case TypeIndex::Tuple: + return FieldType::OLAP_FIELD_TYPE_STRUCT; + // TODO add more types + default: + LOG(FATAL) << "unknow type"; + return FieldType::OLAP_FIELD_TYPE_UNKNOWN; + } +} + +Status parse_object_column(ColumnObject& dest, const IColumn& src, bool need_finalize, + const int* row_begin, const int* row_end) { + assert(src.is_column_string()); + const ColumnString* parsing_column {nullptr}; + if (!src.is_nullable()) { + parsing_column = reinterpret_cast<const ColumnString*>(src.get_ptr().get()); + } else { + auto nullable_column = reinterpret_cast<const ColumnNullable*>(src.get_ptr().get()); + parsing_column = reinterpret_cast<const ColumnString*>( + nullable_column->get_nested_column().get_ptr().get()); + } + std::vector<StringRef> jsons; + if (row_begin != nullptr) { + assert(row_end); + for (auto x = row_begin; x != row_end; ++x) { + StringRef ref = parsing_column->get_data_at(*x); + jsons.push_back(ref); + } + } else { + for (size_t i = 0; i < parsing_column->size(); ++i) { + StringRef ref = parsing_column->get_data_at(i); + jsons.push_back(ref); + } + } + // batch parse + RETURN_IF_ERROR(parse_json_to_variant(dest, jsons)); + + if (need_finalize) { + dest.finalize(); + } + return Status::OK(); +} + +Status parse_object_column(Block& block, size_t position) { + // parse variant column and rewrite column + auto col = block.get_by_position(position).column; + const std::string& col_name = block.get_by_position(position).name; + if (!col->is_column_string()) { + return Status::InvalidArgument("only ColumnString can be parsed to ColumnObject"); + } + vectorized::DataTypePtr type( + std::make_shared<vectorized::DataTypeObject>("", col->is_nullable())); + auto column_object = type->create_column(); + RETURN_IF_ERROR( + parse_object_column(assert_cast<ColumnObject&>(column_object->assume_mutable_ref()), + *col, true /*need finalize*/, nullptr, nullptr)); + // replace by object + block.safe_get_by_position(position).column = column_object->get_ptr(); + block.safe_get_by_position(position).type = type; + block.safe_get_by_position(position).name = col_name; + return Status::OK(); +} + +void flatten_object(Block& block, size_t pos, bool replace_if_duplicated) { + auto column_object_ptr = + assert_cast<ColumnObject*>(block.get_by_position(pos).column->assume_mutable().get()); + if (column_object_ptr->empty()) { + block.erase(pos); + return; + } + size_t num_rows = column_object_ptr->size(); + assert(block.rows() <= num_rows); + assert(column_object_ptr->is_finalized()); + Columns subcolumns; + DataTypes types; + Names names; + for (auto& subcolumn : column_object_ptr->get_subcolumns()) { + subcolumns.push_back(subcolumn->data.get_finalized_column().get_ptr()); + types.push_back(subcolumn->data.get_least_common_type()); + names.push_back(subcolumn->path.get_path()); + } + block.erase(pos); + for (size_t i = 0; i < subcolumns.size(); ++i) { + // block may already contains this column, eg. key columns, we should ignore + // or replcace the same column from object subcolumn + if (block.has(names[i])) { + if (replace_if_duplicated) { + auto& column_type_name = block.get_by_name(names[i]); + column_type_name.column = subcolumns[i]; + column_type_name.type = types[i]; + } + continue; + } + block.insert(ColumnWithTypeAndName {subcolumns[i], types[i], names[i]}); + } + + // fill default value + for (auto& [column, _1, _2] : block.get_columns_with_type_and_name()) { + if (column->size() < num_rows) { + column->assume_mutable()->insert_many_defaults(num_rows - column->size()); + } + } +} + +Status flatten_object(Block& block, bool replace_if_duplicated) { + auto object_pos = + std::find_if(block.begin(), block.end(), [](const ColumnWithTypeAndName& column) { + return column.type->get_type_id() == TypeIndex::VARIANT; + }); + if (object_pos != block.end()) { + flatten_object(block, object_pos - block.begin(), replace_if_duplicated); + } + return Status::OK(); +} + +bool is_conversion_required_between_integers(const IDataType& lhs, const IDataType& rhs) { + WhichDataType which_lhs(lhs); + WhichDataType which_rhs(rhs); + bool is_native_int = which_lhs.is_native_int() && which_rhs.is_native_int(); + bool is_native_uint = which_lhs.is_native_uint() && which_rhs.is_native_uint(); + return (is_native_int || is_native_uint) && + lhs.get_size_of_value_in_memory() <= rhs.get_size_of_value_in_memory(); +} + +bool is_conversion_required_between_integers(FieldType lhs, FieldType rhs) { + // We only support signed integers for semi-structure data at present + // TODO add unsigned integers + if (lhs == OLAP_FIELD_TYPE_BIGINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT || + rhs == OLAP_FIELD_TYPE_INT || rhs == OLAP_FIELD_TYPE_BIGINT); + } + if (lhs == OLAP_FIELD_TYPE_INT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT || + rhs == OLAP_FIELD_TYPE_INT); + } + if (lhs == OLAP_FIELD_TYPE_SMALLINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT); + } + if (lhs == OLAP_FIELD_TYPE_TINYINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT); + } + return true; +} + +Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, ColumnPtr* result) { + ColumnsWithTypeAndName arguments {arg, + {type->create_column_const_with_default_value(1), type, ""}}; + auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type); + Block tmp_block {arguments}; + // the 0 position is input argument, the 1 position is to type argument, the 2 position is result argument + vectorized::ColumnNumbers argnum; + argnum.emplace_back(0); + argnum.emplace_back(1); + size_t result_column = tmp_block.columns(); + tmp_block.insert({nullptr, type, arg.name}); + RETURN_IF_ERROR( + function->execute(nullptr, tmp_block, argnum, result_column, arg.column->size())); + *result = std::move(tmp_block.get_by_position(result_column).column); + return Status::OK(); +} + +static void get_column_def(const vectorized::DataTypePtr& data_type, const std::string& name, + TColumnDef* column) { + if (!name.empty()) { + column->columnDesc.__set_columnName(name); + } + if (data_type->is_nullable()) { + const auto& real_type = static_cast<const DataTypeNullable&>(*data_type); + column->columnDesc.__set_isAllowNull(true); + get_column_def(real_type.get_nested_type(), "", column); + return; + } + column->columnDesc.__set_columnType(to_thrift(get_primitive_type(data_type->get_type_id()))); + if (data_type->get_type_id() == TypeIndex::Array) { + TColumnDef child; + column->columnDesc.__set_children({}); + get_column_def(assert_cast<const DataTypeArray*>(data_type.get())->get_nested_type(), "", + &child); + column->columnDesc.columnLength = + TabletColumn::get_field_length_by_type(column->columnDesc.columnType, 0); + column->columnDesc.children.push_back(child.columnDesc); + return; + } + if (data_type->get_type_id() == TypeIndex::Tuple) { + // TODO + // auto tuple_type = assert_cast<const DataTypeTuple*>(data_type.get()); + // DCHECK_EQ(tuple_type->get_elements().size(), tuple_type->get_element_names().size()); + // for (size_t i = 0; i < tuple_type->get_elements().size(); ++i) { + // TColumnDef child; + // get_column_def(tuple_type->get_element(i), tuple_type->get_element_names()[i], &child); + // column->columnDesc.children.push_back(child.columnDesc); + // } + // return; + } + if (data_type->get_type_id() == TypeIndex::String) { + return; + } + if (WhichDataType(*data_type).is_simple()) { + column->columnDesc.__set_columnLength(data_type->get_size_of_value_in_memory()); + return; + } +} + +// send an empty add columns rpc, the rpc response will fill with base schema info +// maybe we could seperate this rpc from add columns rpc +Status send_fetch_full_base_schema_view_rpc(FullBaseSchemaView* schema_view) { Review Comment: `fetch` could describe `get result` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org