eldenmoon commented on code in PR #24554:
URL: https://github.com/apache/doris/pull/24554#discussion_r1336908766


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -759,76 +766,130 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
-                                                TabletSchemaSPtr& 
flush_schema) {
-    if (block.rows() == 0) {
+Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
+                                                      TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
         return Status::OK();
     }
 
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _context.tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
+    std::vector<int> variant_column_pos;
+    if (_context.tablet_schema->is_partial_update()) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context.tablet_schema->get_update_cids()) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                return Status::InvalidArgument("Not implement partial updates 
for variant");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
+            if (_context.tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
     }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+
+    if (variant_column_pos.empty()) {
         return Status::OK();
     }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
+    //     static     extracted
+    // | --------- | ----------- |
     // The static ones are original _tablet_schame columns
-    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context.tablet_schema);
     vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_context.tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+
+    // If column already exist in original tablet schema, then we pick common 
type
+    // and cast column to common type, and modify tablet column to common type,
+    // otherwise it's a new column, we should add to frontend
+    auto append_column = [&](const TabletColumn& parent_variant, auto& 
column_entry_from_object) {
+        const std::string& column_name =
+                parent_variant.name_lower_case() + "." + 
column_entry_from_object->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                column_entry_from_object->data.get_least_common_type();
+        TabletColumn tablet_column;
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
+                                 
.append(column_entry_from_object->path.get_parts(), false)
+                                 .build();
+        vectorized::schema_util::get_column_by_type(
+                final_data_type_from_object, column_name, tablet_column,
+                vectorized::schema_util::ExtraInfo {.unique_id = -1,
+                                                    .parent_unique_id = 
parent_variant.unique_id(),
+                                                    .path_info = full_path});
+        flush_schema->append_column(std::move(tablet_column));
+        
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
+                            final_data_type_from_object, column_name});
+    };
+
+    // 1. Flatten variant column into flat columns, append flatten columns to 
the back of original Block and TabletSchema
+    // those columns are extracted columns, leave none extracted columns 
remain in original variant column, which is
+    // JSONB format at present.
+    // 2. Collect columns that need to be added or modified when data type 
changes or new columns encountered
+    for (size_t i = 0; i < variant_column_pos.size(); ++i) {
+        size_t variant_pos = variant_column_pos[i];
+        vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
+        const TabletColumn& parent_column = 
_context.tablet_schema->columns()[variant_pos];
+        CHECK(object_column.is_finalized());
+        std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
+        for (auto& entry : object_column.get_subcolumns()) {
+            if (entry->path.empty()) {
+                // root
+                root = entry;
+                continue;
+            }
+            append_column(parent_column, entry);
         }
+        // Create new variant column and set root column
+        auto obj = vectorized::ColumnObject::create(true, false);
+        // '{}' indicates a root path
+        static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
+                {}, root->data.get_finalized_column_ptr()->assume_mutable(),
+                root->data.get_least_common_type());
+        flush_block.get_by_position(variant_pos).column = obj->get_ptr();
+        vectorized::PathInDataBuilder full_root_path_builder;
+        auto full_root_path =
+                full_root_path_builder.append(parent_column.name_lower_case(), 
false).build();
+        
flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path);
+        VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
     }
 
-    // Ensure column are all present at this schema version.Otherwise there 
will be some senario:
-    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added 
columns and schema version became 10
-    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns 
and fetched the schema at version 10
-    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta 
with [a, b, c, d, e]
-    // So we should make sure that rowset at the same schema version alawys 
contain the same size of columns.
-    // so that all columns at schema_version is in either 
_context.tablet_schema or schema_change_recorder
-    for (const auto& [name, column] : schema_view.column_name_to_column) {
-        if (_context.tablet_schema->field_index(name) == -1) {
-            const auto& tcolumn = schema_view.column_name_to_column[name];
-            TabletColumn new_column(tcolumn);
-            _context.schema_change_recorder->add_extended_columns(column,
-                                                                  
schema_view.schema_version);
-        }
+    {
+        // Update rowset schema, tablet's tablet schema will be updated when 
build Rowset
+        // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+        // ctx.tablet_schema:  A(bigint), B(double)
+        // => update_schema:   A(bigint), B(double), C(int), D(int)
+        std::lock_guard<std::mutex> lock(*(_context.schema_lock));

Review Comment:
   flushing memtable is concurrent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to