Tanya-W commented on code in PR #16371:
URL: https://github.com/apache/doris/pull/16371#discussion_r1098318886


##########
be/src/olap/schema_change.cpp:
##########
@@ -576,6 +583,248 @@ Status 
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
     return Status::OK();
 }
 
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+        const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+        const TabletSchemaSPtr& tablet_schema)
+        : SchemaChange(),
+          _alter_inverted_indexs(alter_inverted_indexs),
+          _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+    VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr 
rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet,
+                                             TabletSchemaSPtr 
base_tablet_schema) {
+    Status res = Status::OK();
+    if (rowset_reader->rowset()->empty() || 
rowset_reader->rowset()->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    std::vector<ColumnId> return_columns;
+    for (auto& inverted_index : _alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = _tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // create inverted index writer
+    auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+    std::string segment_dir = base_tablet->tablet_path();
+    auto fs = rowset_meta->fs();
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        std::string segment_filename =
+                fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), 
i);
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+
+            std::unique_ptr<Field> field(FieldFactory::create(column));
+            _index_metas.emplace_back(new TabletIndex());
+            _index_metas.back()->init_from_thrift(inverted_index, 
*_tablet_schema);
+            std::unique_ptr<segment_v2::InvertedIndexColumnWriter> 
inverted_index_builder;
+            try {
+                RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+                        field.get(), &inverted_index_builder, 
segment_filename, segment_dir,
+                        _index_metas.back().get(), fs));
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+
+            if (inverted_index_builder) {
+                std::string writer_sign = fmt::format("{}_{}", i, index_id);
+                _inverted_index_builders.insert(
+                        std::make_pair(writer_sign, 
std::move(inverted_index_builder)));
+            }
+        }
+    }
+
+    SegmentCacheHandle segment_cache_handle;
+    // load segments
+    RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+            std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()), 
&segment_cache_handle,
+            false));
+
+    // create iterator for each segment
+    StorageReadOptions read_options;
+    OlapReaderStatistics stats;
+    read_options.stats = &stats;
+    read_options.tablet_schema = _tablet_schema;
+    std::unique_ptr<Schema> schema =
+            std::make_unique<Schema>(_tablet_schema->columns(), 
return_columns);
+    for (auto& seg_ptr : segment_cache_handle.get_segments()) {
+        std::unique_ptr<RowwiseIterator> iter;
+        res = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!res.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+                         << "]: " << res.to_string();
+            return Status::Error<ROWSET_READER_INIT>();
+        }
+
+        std::shared_ptr<vectorized::Block> block =
+                
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
+        do {
+            block->clear_column_data();
+            res = iter->next_batch(block.get());
+            if (!res.ok()) {
+                if (res.is<END_OF_FILE>()) {
+                    break;
+                }
+                RETURN_NOT_OK_LOG(
+                        res, "failed to read next block when schema change for 
inverted index.");
+            }
+
+            // copy block
+            auto ref_block = *block;
+
+            // write inverted index
+            if (_write_inverted_index(iter->data_id(), &ref_block) != 
Status::OK()) {
+                res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>();
+                LOG(WARNING) << "failed to write block.";
+                return res;
+            }
+        } while (block->rows() != 0);
+    }
+
+    // finish write inverted index, flush data to compound file
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+            std::string writer_sign = fmt::format("{}_{}", i, index_id);
+            try {
+                if (_inverted_index_builders[writer_sign]) {
+                    _inverted_index_builders[writer_sign]->finish();
+                }
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+        }
+    }
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+
+    LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows();
+    return res;
+}
+
+Status SchemaChangeForInvertedIndex::_add_nullable(const std::string& 
column_name,
+                                                   const std::string& 
index_writer_sign,
+                                                   Field* field, const 
uint8_t* null_map,
+                                                   const uint8_t** ptr, size_t 
num_rows) {
+    size_t offset = 0;
+    auto next_run_step = [&]() {
+        size_t step = 1;
+        for (auto i = offset + 1; i < num_rows; ++i) {
+            if (null_map[offset] == null_map[i])
+                step++;
+            else
+                break;
+        }
+        return step;
+    };
+
+    try {
+        do {
+            auto step = next_run_step();
+            if (null_map[offset]) {
+                
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_nulls(step));
+                *ptr += field->size() * step;
+            } else {
+                if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+                    DCHECK(field->get_sub_field_count() == 1);
+                    const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                            field->get_sub_field(0)->size(), col_cursor, 
step));
+                } else {
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                            column_name, *ptr, step));
+                }
+            }
+            offset += step;
+        } while (offset < num_rows);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_add_data(const std::string& column_name,
+                                               const std::string& 
index_writer_sign, Field* field,
+                                               const uint8_t** ptr, size_t 
num_rows) {
+    try {
+        if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+            DCHECK(field->get_sub_field_count() == 1);
+            const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                    field->get_sub_field(0)->size(), col_cursor, num_rows));
+        } else {
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                    column_name, *ptr, num_rows));
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_write_inverted_index(int32_t segment_idx,
+                                                           vectorized::Block* 
block) {
+    LOG(INFO) << "begin to write inverted index";

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to