eldenmoon commented on code in PR #26749:
URL: https://github.com/apache/doris/pull/26749#discussion_r1405565755


##########
be/src/olap/rowset/segment_creator.cpp:
##########
@@ -40,32 +49,178 @@ SegmentFlusher::SegmentFlusher() = default;
 
 SegmentFlusher::~SegmentFlusher() = default;
 
-Status SegmentFlusher::init(const RowsetWriterContext& rowset_writer_context) {
-    _context = rowset_writer_context;
+Status SegmentFlusher::init(RowsetWriterContext& rowset_writer_context) {
+    _context = &rowset_writer_context;
     return Status::OK();
 }
 
 Status SegmentFlusher::flush_single_block(const vectorized::Block* block, 
int32_t segment_id,
-                                          int64_t* flush_size, 
TabletSchemaSPtr flush_schema) {
+                                          int64_t* flush_size) {
     if (block->rows() == 0) {
         return Status::OK();
     }
-    bool no_compression = block->bytes() <= 
config::segment_compression_threshold_kb * 1024;
+    TabletSchemaSPtr flush_schema = nullptr;
+    // Expand variant columns
+    vectorized::Block flush_block(*block);
+    if (_context->write_type != DataWriteType::TYPE_COMPACTION &&
+        _context->tablet_schema->num_variant_columns() > 0) {
+        RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, 
flush_schema));
+    }
+    bool no_compression = flush_block.bytes() <= 
config::segment_compression_threshold_kb * 1024;
     if (config::enable_vertical_segment_writer &&
-        _context.tablet_schema->cluster_key_idxes().empty()) {
+        _context->tablet_schema->cluster_key_idxes().empty()) {
         std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
         RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
-        RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows()));
+        RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
         RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
     } else {
         std::unique_ptr<segment_v2::SegmentWriter> writer;
         RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
-        RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows()));
+        RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
         RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
     }
     return Status::OK();
 }
 
+Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
+                                                     TabletSchemaSPtr& 
flush_schema) {
+    size_t num_rows = block.rows();
+    if (num_rows == 0) {
+        return Status::OK();
+    }
+
+    std::vector<int> variant_column_pos;
+    if (_context->partial_update_info && 
_context->partial_update_info->is_partial_update) {
+        // check columns that used to do partial updates should not include 
variant
+        for (int i : _context->partial_update_info->update_cids) {
+            const auto& col = _context->tablet_schema->columns()[i];
+            if (!col.is_key() && col.name() != DELETE_SIGN) {
+                return Status::InvalidArgument(
+                        "Not implement partial update for variant only support 
delete currently");
+            }
+        }
+    } else {
+        for (int i = 0; i < _context->tablet_schema->columns().size(); ++i) {
+            if (_context->tablet_schema->columns()[i].is_variant_type()) {
+                variant_column_pos.push_back(i);
+            }
+        }
+    }
+
+    if (variant_column_pos.empty()) {
+        return Status::OK();
+    }
+
+    try {
+        // Parse each variant column from raw string column
+        vectorized::schema_util::parse_variant_columns(block, 
variant_column_pos);
+        vectorized::schema_util::finalize_variant_columns(block, 
variant_column_pos,
+                                                          false /*not ingore 
sparse*/);
+        vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_column_pos);
+    } catch (const doris::Exception& e) {
+        // TODO more graceful, max_filter_ratio
+        LOG(WARNING) << "encounter execption " << e.to_string();
+        return Status::InternalError(e.to_string());
+    }
+
+    // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
+    //     static     extracted
+    // | --------- | ----------- |
+    // The static ones are original _tablet_schame columns
+    flush_schema = std::make_shared<TabletSchema>();
+    flush_schema->copy_from(*_context->original_tablet_schema);
+
+    vectorized::Block flush_block(std::move(block));
+    // If column already exist in original tablet schema, then we pick common 
type
+    // and cast column to common type, and modify tablet column to common type,
+    // otherwise it's a new column, we should add to frontend
+    auto append_column = [&](const TabletColumn& parent_variant, auto& 
column_entry_from_object) {
+        const std::string& column_name =
+                parent_variant.name_lower_case() + "." + 
column_entry_from_object->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                column_entry_from_object->data.get_least_common_type();
+        TabletColumn tablet_column;
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(parent_variant.name_lower_case(), false)
+                                 
.append(column_entry_from_object->path.get_parts(), false)
+                                 .build();
+        vectorized::schema_util::get_column_by_type(

Review Comment:
   done



##########
be/src/olap/tablet_schema.cpp:
##########
@@ -1040,9 +1180,12 @@ bool 
TabletSchema::has_inverted_index_with_index_id(int32_t index_id) const {
     return false;
 }
 
-const TabletIndex* TabletSchema::get_inverted_index(int32_t col_unique_id) 
const {
-    // TODO use more efficient impl
+const TabletIndex* TabletSchema::get_inverted_index(int32_t col_unique_id,
+                                                    const std::string& 
suffix_path) const {
     for (size_t i = 0; i < _indexes.size(); i++) {
+        if (_indexes[i].get_escaped_index_suffix_path() != 
escape_for_path_name(suffix_path)) {

Review Comment:
   done



##########
be/src/olap/tablet_schema.cpp:
##########
@@ -1030,8 +1166,12 @@ bool TabletSchema::has_inverted_index(int32_t 
col_unique_id) const {
     return false;
 }
 
-bool TabletSchema::has_inverted_index_with_index_id(int32_t index_id) const {
+bool TabletSchema::has_inverted_index_with_index_id(int32_t index_id,
+                                                    const std::string& 
suffix_name) const {
     for (size_t i = 0; i < _indexes.size(); i++) {
+        if (_indexes[i].get_escaped_index_suffix_path() != suffix_name) {

Review Comment:
   done



##########
be/src/vec/common/schema_util.cpp:
##########
@@ -289,70 +303,140 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
         TabletColumn common_column;
         // const std::string& column_name = variant_col_name + "." + 
tuple_paths[i].get_path();
         get_column_by_type(tuple_types[i], tuple_paths[i].get_path(), 
common_column,
-                           ExtraInfo {.unique_id = -1,
+                           ExtraInfo {.unique_id = variant_col_unique_id,
                                       .parent_unique_id = 
variant_col_unique_id,
                                       .path_info = tuple_paths[i]});
         common_schema->append_column(common_column);
     }
 }
 
-void get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
-                             TabletSchemaSPtr& common_schema) {
-    // Pick tablet schema with max schema version
-    const TabletSchemaSPtr base_schema =
-            *std::max_element(schemas.cbegin(), schemas.cend(),
-                              [](const TabletSchemaSPtr a, const 
TabletSchemaSPtr b) {
-                                  return a->schema_version() < 
b->schema_version();
-                              });
-    CHECK(base_schema);
-    CHECK(common_schema);
-    common_schema->copy_from(*base_schema);
-    // Merge columns from other schemas
-    common_schema->clear_columns();
-    std::vector<int32_t> variant_column_unique_id;
-    // Get all columns without extracted columns and collect variant col 
unique id
-    for (const TabletColumn& col : base_schema->columns()) {
-        if (col.is_variant_type()) {
-            variant_column_unique_id.push_back(col.unique_id());
+void inherit_tablet_index(TabletSchemaSPtr& schema) {
+    std::unordered_map<int32_t, TabletIndex> variants_index_meta;
+    // Get all variants tablet index metas if exist
+    for (const auto& col : schema->columns()) {
+        auto index_meta = schema->get_inverted_index(col.unique_id(), "");
+        if (col.is_variant_type() && index_meta != nullptr) {
+            variants_index_meta.emplace(col.unique_id(), *index_meta);
         }
+    }
+
+    // Add index meta if extracted column is missing index meta
+    for (const auto& col : schema->columns()) {
         if (!col.is_extracted_column()) {
-            common_schema->append_column(col);
+            continue;
+        }
+        auto it = variants_index_meta.find(col.parent_unique_id());
+        // variant has no index meta, ignore
+        if (it == variants_index_meta.end()) {
+            continue;
+        }
+        auto index_meta = schema->get_inverted_index(col);
+        // add index meta
+        TabletIndex index_info = it->second;
+        
index_info.set_escaped_escaped_index_suffix_path(col.path_info().get_path());
+        if (index_meta != nullptr) {
+            // already exist
+            schema->update_index(col, index_info);

Review Comment:
   TabletIndex is updated



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -427,6 +428,13 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr 
rowset) {
     if (rowset->rowset_meta()->has_delete_predicate()) {
         
_rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
     }
+    // Update the tablet schema in the rowset metadata if the tablet schema 
contains a variant.
+    // During the build process, _context.tablet_schema will be used as the 
rowset schema.
+    // This situation may arise in the event of a linked schema change. If 
this schema is not set,
+    // the subcolumns of the variant will be lost.
+    if (_context.tablet_schema->num_variant_columns() > 0 && 
rowset->tablet_schema() != nullptr) {
+        _context.tablet_schema = rowset->tablet_schema();

Review Comment:
   no `add_rowset` should be single threaded



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -445,15 +453,9 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* 
block, int32_t segmen
         return Status::OK();
     }
 
-    TabletSchemaSPtr flush_schema;
-    if (_context.tablet_schema->num_variant_columns() > 0) {
-        // Unfold variant column
-        RETURN_IF_ERROR(expand_variant_to_subcolumns(*block, flush_schema));

Review Comment:
   to make heavy schema change feasible for variant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to