xiaokang commented on code in PR #23498: URL: https://github.com/apache/doris/pull/23498#discussion_r1314162358
########## be/src/olap/delta_writer.cpp: ########## @@ -51,6 +51,7 @@ #include "util/mem_info.h" #include "util/ref_count_closure.h" #include "util/stopwatch.hpp" +#include "util/time.h" Review Comment: useless include? ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -598,7 +575,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f if (tablet_column.has_default_value()) { mutable_full_columns[cids_missing[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); - } else if (tablet_column.is_nullable()) { + } else if (tablet_column.is_nullable() && + mutable_full_columns[cids_missing[i]]->can_be_inside_nullable()) { Review Comment: unrelated to variant? ########## be/src/vec/columns/column.h: ########## @@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> { return nullptr; } + /// Some columns may require finalization before using of other operations. + virtual void finalize() {} + + MutablePtr clone_finalized() const { + auto finalized = IColumn::mutate(get_ptr()); Review Comment: no clone ########## be/src/olap/rowset/beta_rowset_writer_v2.h: ########## @@ -102,6 +98,8 @@ class BetaRowsetWriterV2 : public RowsetWriter { return nullptr; } + RowsetWriterContext& mutable_context() override { LOG(FATAL) << "not implemented"; } Review Comment: Is v2 not used? ########## be/src/olap/rowset/rowset_writer_context.h: ########## @@ -40,24 +40,25 @@ struct RowsetWriterContext { RowsetWriterContext() : tablet_id(0), tablet_schema_hash(0), - index_id(0), partition_id(0), + index_id(0), rowset_type(BETA_ROWSET), rowset_state(PREPARED), version(Version(0, 0)), sender_id(0), txn_id(0), tablet_uid(0, 0), - segments_overlap(OVERLAP_UNKNOWN) { + segments_overlap(OVERLAP_UNKNOWN), + schema_lock(new std::mutex) { load_id.set_hi(0); load_id.set_lo(0); } RowsetId rowset_id; int64_t tablet_id; int64_t tablet_schema_hash; - int64_t index_id; int64_t partition_id; + int64_t index_id; Review Comment: why change the order of index_id? ########## be/src/olap/rowset/rowset_writer.h: ########## @@ -151,6 +151,8 @@ class RowsetWriter { virtual int64_t segment_writer_ns() { return 0; } + virtual RowsetWriterContext& mutable_context() = 0; Review Comment: why need a mutable context? ########## be/src/vec/columns/column.h: ########## @@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> { return nullptr; } + /// Some columns may require finalization before using of other operations. + virtual void finalize() {} + + MutablePtr clone_finalized() const { + auto finalized = IColumn::mutate(get_ptr()); + finalized->finalize(); + return finalized; + } + // Only used on ColumnDictionary virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> rowset_segment_id) {} virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { return {}; } - // todo(Amory) from column to get data type is not correct ,column is memory data,can not to assume memory data belong to which data type - virtual TypeIndex get_data_type() const { Review Comment: why delete it ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -750,76 +757,130 @@ Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( return Status::OK(); } -Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block, - TabletSchemaSPtr& flush_schema) { - if (block.rows() == 0) { +Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block, + TabletSchemaSPtr& flush_schema) { + size_t num_rows = block.rows(); + if (num_rows == 0) { return Status::OK(); } - // Sanitize block to match exactly from the same type of frontend meta - vectorized::schema_util::FullBaseSchemaView schema_view; - schema_view.table_id = _context.tablet_schema->table_id(); - vectorized::ColumnWithTypeAndName* variant_column = - block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME); - if (!variant_column) { - return Status::OK(); + std::vector<int> variant_column_pos; + if (_context.tablet_schema->is_partial_update()) { + // check columns that used to do partial updates should not include variant + for (int i : _context.tablet_schema->get_update_cids()) { + if (_context.tablet_schema->columns()[i].is_variant_type()) { + return Status::InvalidArgument("Not implement partial updates for variant"); + } + } + } else { + for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) { + if (_context.tablet_schema->columns()[i].is_variant_type()) { + variant_column_pos.push_back(i); + } + } } - auto base_column = variant_column->column; - vectorized::ColumnObject& object_column = - assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref()); - if (object_column.empty()) { - block.erase(BeConsts::DYNAMIC_COLUMN_NAME); + + if (variant_column_pos.empty()) { return Status::OK(); } - object_column.finalize(); - // Has extended columns - RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view)); + + try { + // Parse each variant column from raw string column + vectorized::schema_util::parse_variant_columns(block, variant_column_pos); + vectorized::schema_util::finalize_variant_columns(block, variant_column_pos, + false /*not ingore sparse*/); + vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_column_pos); + } catch (const doris::Exception& e) { + // TODO more graceful, max_filter_ratio + LOG(WARNING) << "encounter execption " << e.to_string(); + return Status::InternalError(e.to_string()); + } + // Dynamic Block consists of two parts, dynamic part of columns and static part of columns - // static dynamic - // | ----- | ------- | + // static extracted + // | --------- | ----------- | // The static ones are original _tablet_schame columns - flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema); + flush_schema = std::make_shared<TabletSchema>(); + flush_schema->copy_from(*_context.tablet_schema); vectorized::Block flush_block(std::move(block)); - // The dynamic ones are auto generated and extended, append them the the orig_block - for (auto& entry : object_column.get_subcolumns()) { - const std::string& column_name = entry->path.get_path(); - auto column_iter = schema_view.column_name_to_column.find(column_name); - if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) { - // Column maybe dropped by light weight schema change DDL - continue; - } - TabletColumn column(column_iter->second); - auto data_type = vectorized::DataTypeFactory::instance().create_data_type( - column, column.is_nullable()); - // Dynamic generated columns does not appear in original tablet schema - if (_context.tablet_schema->field_index(column.name()) < 0) { - flush_schema->append_column(column); - flush_block.insert({data_type->create_column(), data_type, column.name()}); + + // If column already exist in original tablet schema, then we pick common type + // and cast column to common type, and modify tablet column to common type, + // otherwise it's a new column, we should add to frontend + auto append_column = [&](const TabletColumn& parent_variant, auto& column_entry_from_object) { + const std::string& column_name = + parent_variant.name_lower_case() + "." + column_entry_from_object->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + column_entry_from_object->data.get_least_common_type(); + TabletColumn tablet_column; + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false) + .append(column_entry_from_object->path.get_parts(), false) + .build(); + vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, tablet_column, + vectorized::schema_util::ExtraInfo {.unique_id = -1, + .parent_unique_id = parent_variant.unique_id(), + .path_info = full_path}); + flush_schema->append_column(std::move(tablet_column)); + flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(), + final_data_type_from_object, column_name}); + }; + + // 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema + // those columns are extracted columns, leave none extracted columns remain in original variant column, which is + // JSONB format at present. + // 2. Collect columns that need to be added or modified when data type changes or new columns encountered + for (size_t i = 0; i < variant_column_pos.size(); ++i) { + size_t variant_pos = variant_column_pos[i]; + vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>( + flush_block.get_by_position(variant_pos).column->assume_mutable_ref()); + const TabletColumn& parent_column = _context.tablet_schema->columns()[variant_pos]; + CHECK(object_column.is_finalized()); + std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root; + for (auto& entry : object_column.get_subcolumns()) { + if (entry->path.empty()) { + // root + root = entry; + continue; + } + append_column(parent_column, entry); } + // Create new variant column and set root column + auto obj = vectorized::ColumnObject::create(true, false); + // '{}' indicates a root path + static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column( + {}, root->data.get_finalized_column_ptr()->assume_mutable(), + root->data.get_least_common_type()); + flush_block.get_by_position(variant_pos).column = obj->get_ptr(); + vectorized::PathInDataBuilder full_root_path_builder; + auto full_root_path = + full_root_path_builder.append(parent_column.name_lower_case(), false).build(); + flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path); + VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); } - // Ensure column are all present at this schema version.Otherwise there will be some senario: - // Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added columns and schema version became 10 - // Load2 -> version(10) with schema [a, b, c] and has no extended columns and fetched the schema at version 10 - // Load2 will persist meta with [a, b, c] but Load1 will persist meta with [a, b, c, d, e] - // So we should make sure that rowset at the same schema version alawys contain the same size of columns. - // so that all columns at schema_version is in either _context.tablet_schema or schema_change_recorder - for (const auto& [name, column] : schema_view.column_name_to_column) { - if (_context.tablet_schema->field_index(name) == -1) { - const auto& tcolumn = schema_view.column_name_to_column[name]; - TabletColumn new_column(tcolumn); - _context.schema_change_recorder->add_extended_columns(column, - schema_view.schema_version); - } + { + // Update rowset schema, tablet's tablet schema will be updated when build Rowset + // Eg. flush schema: A(int), B(float), C(int), D(int) + // ctx.tablet_schema: A(bigint), B(double) + // => update_schema: A(bigint), B(double), C(int), D(int) + std::lock_guard<std::mutex> lock(*(_context.schema_lock)); + TabletSchemaSPtr update_schema = std::make_shared<TabletSchema>(); + vectorized::schema_util::get_least_common_schema({_context.tablet_schema, flush_schema}, Review Comment: Is the order of column in table schema critical? If the original schema is (a INT, v VARIANT) and expended to (a INT, v VARIANT, v:x, v:y), and then user ADD COLUMN c STRING, c will be added after a or v:y or cause error? ########## be/src/olap/rowset/rowset_writer_context.h: ########## @@ -92,13 +93,13 @@ struct RowsetWriterContext { std::set<int32_t> skip_inverted_index; DataWriteType write_type = DataWriteType::TYPE_DEFAULT; std::shared_ptr<Tablet> tablet = nullptr; - // for tracing local schema change record - std::shared_ptr<vectorized::schema_util::LocalSchemaChangeRecorder> schema_change_recorder = Review Comment: why delete it? ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -107,12 +107,16 @@ SegmentWriter::~SegmentWriter() { void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column, TabletSchemaSPtr tablet_schema) { meta->set_column_id(column_id); - meta->set_unique_id(column.unique_id()); Review Comment: why delete it? ########## be/src/olap/rowset/segment_v2/column_writer.cpp: ########## @@ -353,6 +353,13 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* *writer = std::move(writer_local); return Status::OK(); } + case FieldType::OLAP_FIELD_TYPE_VARIANT: { Review Comment: can be combined to is_scalar_type(column->type()) branch. ########## be/src/vec/columns/column.h: ########## @@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> { return nullptr; } + /// Some columns may require finalization before using of other operations. + virtual void finalize() {} + + MutablePtr clone_finalized() const { + auto finalized = IColumn::mutate(get_ptr()); + finalized->finalize(); + return finalized; + } + // Only used on ColumnDictionary virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> rowset_segment_id) {} virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { return {}; } - // todo(Amory) from column to get data type is not correct ,column is memory data,can not to assume memory data belong to which data type - virtual TypeIndex get_data_type() const { Review Comment: get_data_type() is added to ColumnXXX. It's opposite to original LOG_FATAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org