xiaokang commented on code in PR #23498:
URL: https://github.com/apache/doris/pull/23498#discussion_r1314162358


##########
be/src/olap/delta_writer.cpp:
##########
@@ -51,6 +51,7 @@
 #include "util/mem_info.h"
 #include "util/ref_count_closure.h"
 #include "util/stopwatch.hpp"
+#include "util/time.h"

Review Comment:
   useless include?



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -598,7 +575,8 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
                 if (tablet_column.has_default_value()) {
                     mutable_full_columns[cids_missing[i]]->insert_from(
                             *mutable_default_value_columns[i].get(), 0);
-                } else if (tablet_column.is_nullable()) {
+                } else if (tablet_column.is_nullable() &&
+                           
mutable_full_columns[cids_missing[i]]->can_be_inside_nullable()) {

Review Comment:
   unrelated to variant?



##########
be/src/vec/columns/column.h:
##########
@@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> {
         return nullptr;
     }
 
+    /// Some columns may require finalization before using of other operations.
+    virtual void finalize() {}
+
+    MutablePtr clone_finalized() const {
+        auto finalized = IColumn::mutate(get_ptr());

Review Comment:
   no clone



##########
be/src/olap/rowset/beta_rowset_writer_v2.h:
##########
@@ -102,6 +98,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {
         return nullptr;
     }
 
+    RowsetWriterContext& mutable_context() override { LOG(FATAL) << "not 
implemented"; }

Review Comment:
   Is v2 not used?



##########
be/src/olap/rowset/rowset_writer_context.h:
##########
@@ -40,24 +40,25 @@ struct RowsetWriterContext {
     RowsetWriterContext()
             : tablet_id(0),
               tablet_schema_hash(0),
-              index_id(0),
               partition_id(0),
+              index_id(0),
               rowset_type(BETA_ROWSET),
               rowset_state(PREPARED),
               version(Version(0, 0)),
               sender_id(0),
               txn_id(0),
               tablet_uid(0, 0),
-              segments_overlap(OVERLAP_UNKNOWN) {
+              segments_overlap(OVERLAP_UNKNOWN),
+              schema_lock(new std::mutex) {
         load_id.set_hi(0);
         load_id.set_lo(0);
     }
 
     RowsetId rowset_id;
     int64_t tablet_id;
     int64_t tablet_schema_hash;
-    int64_t index_id;
     int64_t partition_id;
+    int64_t index_id;

Review Comment:
   why change the order of index_id?



##########
be/src/olap/rowset/rowset_writer.h:
##########
@@ -151,6 +151,8 @@ class RowsetWriter {
 
     virtual int64_t segment_writer_ns() { return 0; }
 
+    virtual RowsetWriterContext& mutable_context() = 0;

Review Comment:
   why need a mutable context?



##########
be/src/vec/columns/column.h:
##########
@@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> {
         return nullptr;
     }
 
+    /// Some columns may require finalization before using of other operations.
+    virtual void finalize() {}
+
+    MutablePtr clone_finalized() const {
+        auto finalized = IColumn::mutate(get_ptr());
+        finalized->finalize();
+        return finalized;
+    }
+
     // Only used on ColumnDictionary
     virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> 
rowset_segment_id) {}
 
     virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { 
return {}; }
-    // todo(Amory) from column to get data type is not correct ,column is 
memory data,can not to assume memory data belong to which data type
-    virtual TypeIndex get_data_type() const {

Review Comment:
   why delete it



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -750,76 +757,130 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
-                                                TabletSchemaSPtr& 
flush_schema) {
-    if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+                                                      TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
         return Status::OK();
     }
 
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _context.tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
+    std::vector<int> variant_column_pos;
+    if (_context.tablet_schema->is_partial_update()) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context.tablet_schema->get_update_cids()) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                return Status::InvalidArgument("Not implement partial updates 
for variant");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
     }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    if (variant_column_pos.empty()) {
         return Status::OK();
     }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
+    //     static     extracted
+    // | --------- | ----------- |
     // The static ones are original _tablet_schame columns
-    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context.tablet_schema);
     vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_context.tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+
+    // If column already exist in original tablet schema, then we pick common 
type
+    // and cast column to common type, and modify tablet column to common type,
+    // otherwise it's a new column, we should add to frontend
+    auto append_column = [&](const TabletColumn& parent_variant, auto& 
column_entry_from_object) {
+        const std::string& column_name =
+                parent_variant.name_lower_case() + "." + 
column_entry_from_object->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                column_entry_from_object->data.get_least_common_type();
+        TabletColumn tablet_column;
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
+                                 
.append(column_entry_from_object->path.get_parts(), false)
+                                 .build();
+        vectorized::schema_util::get_column_by_type(
+                final_data_type_from_object, column_name, tablet_column,
+                vectorized::schema_util::ExtraInfo {.unique_id = -1,
+                                                    .parent_unique_id = 
parent_variant.unique_id(),
+                                                    .path_info = full_path});
+        flush_schema->append_column(std::move(tablet_column));
+        
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+                            final_data_type_from_object, column_name});
+    };
+
+    // 1. Flatten variant column into flat columns, append flatten columns to 
the back of original Block and TabletSchema
+    // those columns are extracted columns, leave none extracted columns 
remain in original variant column, which is
+    // JSONB format at present.
+    // 2. Collect columns that need to be added or modified when data type 
changes or new columns encountered
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        size_t variant_pos = variant_column_pos[i];
+        vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+        const TabletColumn& parent_column = 
_context.tablet_schema->columns()[variant_pos];
+        CHECK(object_column.is_finalized());
+        std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
+        for (auto& entry : object_column.get_subcolumns()) {
+            if (entry->path.empty()) {
+                // root
+                root = entry;
+                continue;
+            }
+            append_column(parent_column, entry);
         }
+        // Create new variant column and set root column
+        auto obj = vectorized::ColumnObject::create(true, false);
+        // '{}' indicates a root path
+        static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
+                {}, root->data.get_finalized_column_ptr()->assume_mutable(),
+                root->data.get_least_common_type());
+        flush_block.get_by_position(variant_pos).column = obj->get_ptr();
+        vectorized::PathInDataBuilder full_root_path_builder;
+        auto full_root_path =
+                full_root_path_builder.append(parent_column.name_lower_case(), 
false).build();
+        
flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path);
+        VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
     }
 
-    // Ensure column are all present at this schema version.Otherwise there 
will be some senario:
-    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added 
columns and schema version became 10
-    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns 
and fetched the schema at version 10
-    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta 
with [a, b, c, d, e]
-    // So we should make sure that rowset at the same schema version alawys 
contain the same size of columns.
-    // so that all columns at schema_version is in either 
_context.tablet_schema or schema_change_recorder
-    for (const auto& [name, column] : schema_view.column_name_to_column) {
-        if (_context.tablet_schema->field_index(name) == -1) {
-            const auto& tcolumn = schema_view.column_name_to_column[name];
-            TabletColumn new_column(tcolumn);
-            _context.schema_change_recorder->add_extended_columns(column,
-                                                                  
schema_view.schema_version);
-        }
+    {
+        // Update rowset schema, tablet's tablet schema will be updated when 
build Rowset
+        // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+        // ctx.tablet_schema:  A(bigint), B(double)
+        // => update_schema:   A(bigint), B(double), C(int), D(int)
+        std::lock_guard<std::mutex> lock(*(_context.schema_lock));
+        TabletSchemaSPtr update_schema = std::make_shared<TabletSchema>();
+        
vectorized::schema_util::get_least_common_schema({_context.tablet_schema, 
flush_schema},

Review Comment:
   Is the order of column in table schema critical? If the original schema is 
(a INT, v VARIANT) and expended to (a INT, v VARIANT, v:x, v:y), and then user 
ADD COLUMN c STRING, c will be added after a or v:y or cause error?



##########
be/src/olap/rowset/rowset_writer_context.h:
##########
@@ -92,13 +93,13 @@ struct RowsetWriterContext {
     std::set<int32_t> skip_inverted_index;
     DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
     std::shared_ptr<Tablet> tablet = nullptr;
-    // for tracing local schema change record
-    std::shared_ptr<vectorized::schema_util::LocalSchemaChangeRecorder> 
schema_change_recorder =

Review Comment:
   why delete it?



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -107,12 +107,16 @@ SegmentWriter::~SegmentWriter() {
 void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
                                      const TabletColumn& column, 
TabletSchemaSPtr tablet_schema) {
     meta->set_column_id(column_id);
-    meta->set_unique_id(column.unique_id());

Review Comment:
   why delete it?



##########
be/src/olap/rowset/segment_v2/column_writer.cpp:
##########
@@ -353,6 +353,13 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             *writer = std::move(writer_local);
             return Status::OK();
         }
+        case FieldType::OLAP_FIELD_TYPE_VARIANT: {

Review Comment:
   can be combined to is_scalar_type(column->type()) branch.



##########
be/src/vec/columns/column.h:
##########
@@ -145,15 +145,19 @@ class IColumn : public COW<IColumn> {
         return nullptr;
     }
 
+    /// Some columns may require finalization before using of other operations.
+    virtual void finalize() {}
+
+    MutablePtr clone_finalized() const {
+        auto finalized = IColumn::mutate(get_ptr());
+        finalized->finalize();
+        return finalized;
+    }
+
     // Only used on ColumnDictionary
     virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> 
rowset_segment_id) {}
 
     virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { 
return {}; }
-    // todo(Amory) from column to get data type is not correct ,column is 
memory data,can not to assume memory data belong to which data type
-    virtual TypeIndex get_data_type() const {

Review Comment:
   get_data_type() is added to ColumnXXX. It's opposite to original LOG_FATAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to