xiaokang commented on code in PR #16371:
URL: https://github.com/apache/doris/pull/16371#discussion_r1095475224


##########
be/src/olap/schema_change.cpp:
##########
@@ -958,6 +1232,270 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     return res;
 }
 
+Status SchemaChangeHandler::_do_process_alter_inverted_index(
+        TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) {
+    Status res = Status::OK();
+    // TODO(wy): check whether the tablet's max continuous version == 
request.version
+    if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() 
== TABLET_STOPPED ||
+        tablet->tablet_state() == TABLET_SHUTDOWN) {
+        LOG(WARNING) << "tablet's state=" << tablet->tablet_state()
+                     << " cannot alter inverted index";
+        return Status::Error<ErrorCode::INTERNAL_ERROR>();
+    }
+
+    std::shared_lock base_migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
+        return Status::Error<TRY_LOCK_FAILED>();
+    }
+
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->copy_from(*tablet->tablet_schema());
+    if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
+        tablet_schema->clear_columns();
+        for (const auto& column : request.columns) {
+            tablet_schema->append_column(TabletColumn(column));
+        }
+    }
+
+    // get rowset reader
+    std::vector<RowsetReaderSharedPtr> rs_readers;
+    DeleteHandler delete_handler;
+    RETURN_IF_ERROR(
+            _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, 
&delete_handler));
+    if (request.__isset.is_drop_op && request.is_drop_op) {
+        // drop index
+        res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request);
+    } else {
+        // add index
+        res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, 
tablet, request);
+    }
+
+    if (!res.ok()) {
+        LOG(WARNING) << "failed to alter tablet. tablet=" << 
tablet->full_name();
+        return res;
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet,
+                                                const TabletSchemaSPtr& 
tablet_schema,
+                                                const TAlterInvertedIndexReq& 
request,
+                                                
std::vector<RowsetReaderSharedPtr>* rs_readers,
+                                                DeleteHandler* delete_handler) 
{
+    Status res = Status::OK();
+    std::vector<Version> versions_to_be_changed;
+    std::vector<ColumnId> return_columns;
+    std::vector<TOlapTableIndex> alter_inverted_indexs;
+
+    if (request.__isset.alter_inverted_indexes) {
+        alter_inverted_indexs = request.alter_inverted_indexes;
+    }
+
+    for (auto& inverted_index : alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // obtain base tablet's push lock and header write lock to prevent loading 
data
+    {
+        std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock());
+        std::lock_guard<std::shared_mutex> 
tablet_wlock(tablet->get_header_lock());
+
+        do {

Review Comment:
   is there any function can be reused here to create rs_readers?



##########
be/src/olap/task/engine_alter_tablet_task.cpp:
##########
@@ -45,4 +45,34 @@ Status EngineAlterTabletTask::execute() {
     return res;
 } // execute
 
+EngineAlterInvertedIndexTask::EngineAlterInvertedIndexTask(
+        const TAlterInvertedIndexReq& alter_inverted_index_request)
+        : _alter_inverted_index_req(alter_inverted_index_request) {
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
+            MemTrackerLimiter::Type::SCHEMA_CHANGE,
+            fmt::format("EngineAlterInvertedIndexTask#tabletId={}",
+                        std::to_string(_alter_inverted_index_req.tablet_id)),
+            config::memory_limitation_per_thread_for_schema_change_bytes);
+}
+
+Status EngineAlterInvertedIndexTask::execute() {
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    DorisMetrics::instance()->create_rollup_requests_total->increment(1);

Review Comment:
   metric name is not suitable



##########
be/src/olap/schema_change.cpp:
##########
@@ -958,6 +1232,270 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     return res;
 }
 
+Status SchemaChangeHandler::_do_process_alter_inverted_index(
+        TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) {
+    Status res = Status::OK();
+    // TODO(wy): check whether the tablet's max continuous version == 
request.version
+    if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() 
== TABLET_STOPPED ||
+        tablet->tablet_state() == TABLET_SHUTDOWN) {
+        LOG(WARNING) << "tablet's state=" << tablet->tablet_state()
+                     << " cannot alter inverted index";
+        return Status::Error<ErrorCode::INTERNAL_ERROR>();
+    }
+
+    std::shared_lock base_migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
+        return Status::Error<TRY_LOCK_FAILED>();
+    }
+
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->copy_from(*tablet->tablet_schema());
+    if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
+        tablet_schema->clear_columns();
+        for (const auto& column : request.columns) {
+            tablet_schema->append_column(TabletColumn(column));
+        }
+    }
+
+    // get rowset reader
+    std::vector<RowsetReaderSharedPtr> rs_readers;
+    DeleteHandler delete_handler;
+    RETURN_IF_ERROR(
+            _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, 
&delete_handler));
+    if (request.__isset.is_drop_op && request.is_drop_op) {
+        // drop index
+        res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request);
+    } else {
+        // add index
+        res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, 
tablet, request);
+    }
+
+    if (!res.ok()) {
+        LOG(WARNING) << "failed to alter tablet. tablet=" << 
tablet->full_name();
+        return res;
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet,
+                                                const TabletSchemaSPtr& 
tablet_schema,
+                                                const TAlterInvertedIndexReq& 
request,
+                                                
std::vector<RowsetReaderSharedPtr>* rs_readers,
+                                                DeleteHandler* delete_handler) 
{
+    Status res = Status::OK();
+    std::vector<Version> versions_to_be_changed;
+    std::vector<ColumnId> return_columns;
+    std::vector<TOlapTableIndex> alter_inverted_indexs;
+
+    if (request.__isset.alter_inverted_indexes) {
+        alter_inverted_indexs = request.alter_inverted_indexes;
+    }
+
+    for (auto& inverted_index : alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // obtain base tablet's push lock and header write lock to prevent loading 
data
+    {
+        std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock());
+        std::lock_guard<std::shared_mutex> 
tablet_wlock(tablet->get_header_lock());
+
+        do {
+            RowsetSharedPtr max_rowset;
+            // get history data to rebuild inverted index and it will check if 
there is hold in base tablet
+            res = _get_versions_to_be_changed(tablet, &versions_to_be_changed, 
&max_rowset);
+            if (!res.ok()) {
+                LOG(WARNING) << "fail to get version to be rebuild inverted 
index. res=" << res;
+                break;
+            }
+
+            // should check the max_version >= request.alter_version, if not 
the rebuild index is useless
+            if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
+                LOG(WARNING) << "base tablet's max version="
+                             << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
+                             << " is less than request version=" << 
request.alter_version;
+                res = Status::InternalError(
+                        "base tablet's max version={} is less than request 
version={}",
+                        (max_rowset == nullptr ? 0 : 
max_rowset->end_version()),
+                        request.alter_version);
+                break;
+            }
+
+            // init one delete handler
+            int64_t end_version = -1;
+            for (auto& version : versions_to_be_changed) {
+                end_version = std::max(end_version, version.second);
+            }
+
+            auto& all_del_preds = tablet->delete_predicates();
+            for (auto& delete_pred : all_del_preds) {
+                if (delete_pred->version().first > end_version) {
+                    continue;
+                }
+                
tablet_schema->merge_dropped_columns(tablet->tablet_schema(delete_pred->version()));
+            }
+            res = delete_handler->init(tablet_schema, all_del_preds, 
end_version);
+            if (!res) {
+                LOG(WARNING) << "init delete handler failed. tablet=" << 
tablet->full_name()
+                             << ", end_version=" << end_version;
+                break;
+            }
+
+            // acquire data sources correspond to history versions
+            tablet->capture_rs_readers(versions_to_be_changed, rs_readers);
+            if (rs_readers->size() < 1) {
+                LOG(WARNING) << "fail to acquire all data sources. "
+                             << "version_num=" << versions_to_be_changed.size()
+                             << ", data_source_num=" << rs_readers->size();
+                res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>();
+                break;
+            }
+
+            // reader_context is stack variables, it's lifetime should keep 
the same with rs_readers
+            RowsetReaderContext reader_context;
+            reader_context.reader_type = READER_ALTER_TABLE;
+            reader_context.tablet_schema = tablet_schema;
+            reader_context.need_ordered_result = true;

Review Comment:
   index should keep consistent with segment data file row by row. So is it 
right to set need_ordered_result = true?



##########
be/src/olap/schema_change.cpp:
##########
@@ -576,6 +583,248 @@ Status 
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
     return Status::OK();
 }
 
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+        const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+        const TabletSchemaSPtr& tablet_schema)
+        : SchemaChange(),
+          _alter_inverted_indexs(alter_inverted_indexs),
+          _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+    VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr 
rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet,
+                                             TabletSchemaSPtr 
base_tablet_schema) {
+    Status res = Status::OK();
+    if (rowset_reader->rowset()->empty() || 
rowset_reader->rowset()->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    std::vector<ColumnId> return_columns;
+    for (auto& inverted_index : _alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = _tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // create inverted index writer
+    auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+    std::string segment_dir = base_tablet->tablet_path();
+    auto fs = rowset_meta->fs();
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {

Review Comment:
   The three loops over segments can be merged to one. And there will be less 
opened _inverted_index_builders. 



##########
be/src/olap/schema_change.cpp:
##########
@@ -576,6 +583,248 @@ Status 
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
     return Status::OK();
 }
 
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+        const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+        const TabletSchemaSPtr& tablet_schema)
+        : SchemaChange(),
+          _alter_inverted_indexs(alter_inverted_indexs),
+          _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+    VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr 
rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet,
+                                             TabletSchemaSPtr 
base_tablet_schema) {
+    Status res = Status::OK();
+    if (rowset_reader->rowset()->empty() || 
rowset_reader->rowset()->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    std::vector<ColumnId> return_columns;
+    for (auto& inverted_index : _alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = _tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // create inverted index writer
+    auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+    std::string segment_dir = base_tablet->tablet_path();
+    auto fs = rowset_meta->fs();
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        std::string segment_filename =
+                fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), 
i);
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+
+            std::unique_ptr<Field> field(FieldFactory::create(column));
+            _index_metas.emplace_back(new TabletIndex());
+            _index_metas.back()->init_from_thrift(inverted_index, 
*_tablet_schema);
+            std::unique_ptr<segment_v2::InvertedIndexColumnWriter> 
inverted_index_builder;
+            try {
+                RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+                        field.get(), &inverted_index_builder, 
segment_filename, segment_dir,
+                        _index_metas.back().get(), fs));
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+
+            if (inverted_index_builder) {
+                std::string writer_sign = fmt::format("{}_{}", i, index_id);
+                _inverted_index_builders.insert(
+                        std::make_pair(writer_sign, 
std::move(inverted_index_builder)));
+            }
+        }
+    }
+
+    SegmentCacheHandle segment_cache_handle;
+    // load segments
+    RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+            std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()), 
&segment_cache_handle,
+            false));
+
+    // create iterator for each segment
+    StorageReadOptions read_options;
+    OlapReaderStatistics stats;
+    read_options.stats = &stats;
+    read_options.tablet_schema = _tablet_schema;
+    std::unique_ptr<Schema> schema =
+            std::make_unique<Schema>(_tablet_schema->columns(), 
return_columns);
+    for (auto& seg_ptr : segment_cache_handle.get_segments()) {
+        std::unique_ptr<RowwiseIterator> iter;
+        res = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!res.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+                         << "]: " << res.to_string();
+            return Status::Error<ROWSET_READER_INIT>();
+        }
+
+        std::shared_ptr<vectorized::Block> block =
+                
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
+        do {
+            block->clear_column_data();
+            res = iter->next_batch(block.get());
+            if (!res.ok()) {
+                if (res.is<END_OF_FILE>()) {
+                    break;
+                }
+                RETURN_NOT_OK_LOG(
+                        res, "failed to read next block when schema change for 
inverted index.");
+            }
+
+            // copy block
+            auto ref_block = *block;
+
+            // write inverted index
+            if (_write_inverted_index(iter->data_id(), &ref_block) != 
Status::OK()) {
+                res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>();
+                LOG(WARNING) << "failed to write block.";
+                return res;
+            }
+        } while (block->rows() != 0);
+    }
+
+    // finish write inverted index, flush data to compound file
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+            std::string writer_sign = fmt::format("{}_{}", i, index_id);
+            try {
+                if (_inverted_index_builders[writer_sign]) {
+                    _inverted_index_builders[writer_sign]->finish();
+                }
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+        }
+    }
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+
+    LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows();
+    return res;
+}
+
+Status SchemaChangeForInvertedIndex::_add_nullable(const std::string& 
column_name,
+                                                   const std::string& 
index_writer_sign,
+                                                   Field* field, const 
uint8_t* null_map,
+                                                   const uint8_t** ptr, size_t 
num_rows) {
+    size_t offset = 0;
+    auto next_run_step = [&]() {
+        size_t step = 1;
+        for (auto i = offset + 1; i < num_rows; ++i) {
+            if (null_map[offset] == null_map[i])
+                step++;
+            else
+                break;
+        }
+        return step;
+    };
+
+    try {
+        do {
+            auto step = next_run_step();
+            if (null_map[offset]) {
+                
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_nulls(step));
+                *ptr += field->size() * step;
+            } else {
+                if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+                    DCHECK(field->get_sub_field_count() == 1);
+                    const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                            field->get_sub_field(0)->size(), col_cursor, 
step));
+                } else {
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                            column_name, *ptr, step));
+                }
+            }
+            offset += step;
+        } while (offset < num_rows);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_add_data(const std::string& column_name,
+                                               const std::string& 
index_writer_sign, Field* field,
+                                               const uint8_t** ptr, size_t 
num_rows) {
+    try {
+        if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+            DCHECK(field->get_sub_field_count() == 1);
+            const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                    field->get_sub_field(0)->size(), col_cursor, num_rows));
+        } else {
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                    column_name, *ptr, num_rows));
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_write_inverted_index(int32_t segment_idx,
+                                                           vectorized::Block* 
block) {
+    LOG(INFO) << "begin to write inverted index";

Review Comment:
   verbose debug log



##########
be/src/vec/olap/olap_data_convertor.h:
##########
@@ -46,13 +46,16 @@ class IOlapColumnDataAccessor {
 
 class OlapBlockDataConvertor {
 public:
+    OlapBlockDataConvertor() = default;

Review Comment:
   why not use OlapBlockDataConvertor(const TabletSchema* tablet_schema)?



##########
be/src/olap/schema_change.cpp:
##########
@@ -958,6 +1232,270 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     return res;
 }
 
+Status SchemaChangeHandler::_do_process_alter_inverted_index(
+        TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) {
+    Status res = Status::OK();
+    // TODO(wy): check whether the tablet's max continuous version == 
request.version
+    if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() 
== TABLET_STOPPED ||
+        tablet->tablet_state() == TABLET_SHUTDOWN) {
+        LOG(WARNING) << "tablet's state=" << tablet->tablet_state()
+                     << " cannot alter inverted index";
+        return Status::Error<ErrorCode::INTERNAL_ERROR>();
+    }
+
+    std::shared_lock base_migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
+        return Status::Error<TRY_LOCK_FAILED>();
+    }
+
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->copy_from(*tablet->tablet_schema());
+    if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
+        tablet_schema->clear_columns();
+        for (const auto& column : request.columns) {
+            tablet_schema->append_column(TabletColumn(column));
+        }
+    }
+
+    // get rowset reader
+    std::vector<RowsetReaderSharedPtr> rs_readers;
+    DeleteHandler delete_handler;
+    RETURN_IF_ERROR(
+            _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, 
&delete_handler));
+    if (request.__isset.is_drop_op && request.is_drop_op) {
+        // drop index
+        res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request);
+    } else {
+        // add index
+        res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, 
tablet, request);
+    }
+
+    if (!res.ok()) {
+        LOG(WARNING) << "failed to alter tablet. tablet=" << 
tablet->full_name();
+        return res;
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet,
+                                                const TabletSchemaSPtr& 
tablet_schema,
+                                                const TAlterInvertedIndexReq& 
request,
+                                                
std::vector<RowsetReaderSharedPtr>* rs_readers,
+                                                DeleteHandler* delete_handler) 
{
+    Status res = Status::OK();
+    std::vector<Version> versions_to_be_changed;
+    std::vector<ColumnId> return_columns;
+    std::vector<TOlapTableIndex> alter_inverted_indexs;
+
+    if (request.__isset.alter_inverted_indexes) {
+        alter_inverted_indexs = request.alter_inverted_indexes;
+    }
+
+    for (auto& inverted_index : alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // obtain base tablet's push lock and header write lock to prevent loading 
data
+    {
+        std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock());
+        std::lock_guard<std::shared_mutex> 
tablet_wlock(tablet->get_header_lock());
+
+        do {
+            RowsetSharedPtr max_rowset;
+            // get history data to rebuild inverted index and it will check if 
there is hold in base tablet
+            res = _get_versions_to_be_changed(tablet, &versions_to_be_changed, 
&max_rowset);
+            if (!res.ok()) {
+                LOG(WARNING) << "fail to get version to be rebuild inverted 
index. res=" << res;
+                break;
+            }
+
+            // should check the max_version >= request.alter_version, if not 
the rebuild index is useless
+            if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
+                LOG(WARNING) << "base tablet's max version="
+                             << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
+                             << " is less than request version=" << 
request.alter_version;
+                res = Status::InternalError(
+                        "base tablet's max version={} is less than request 
version={}",
+                        (max_rowset == nullptr ? 0 : 
max_rowset->end_version()),
+                        request.alter_version);
+                break;
+            }
+
+            // init one delete handler
+            int64_t end_version = -1;
+            for (auto& version : versions_to_be_changed) {
+                end_version = std::max(end_version, version.second);
+            }
+
+            auto& all_del_preds = tablet->delete_predicates();
+            for (auto& delete_pred : all_del_preds) {
+                if (delete_pred->version().first > end_version) {
+                    continue;
+                }
+                
tablet_schema->merge_dropped_columns(tablet->tablet_schema(delete_pred->version()));
+            }
+            res = delete_handler->init(tablet_schema, all_del_preds, 
end_version);
+            if (!res) {
+                LOG(WARNING) << "init delete handler failed. tablet=" << 
tablet->full_name()
+                             << ", end_version=" << end_version;
+                break;
+            }
+
+            // acquire data sources correspond to history versions
+            tablet->capture_rs_readers(versions_to_be_changed, rs_readers);
+            if (rs_readers->size() < 1) {
+                LOG(WARNING) << "fail to acquire all data sources. "
+                             << "version_num=" << versions_to_be_changed.size()
+                             << ", data_source_num=" << rs_readers->size();
+                res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>();
+                break;
+            }
+
+            // reader_context is stack variables, it's lifetime should keep 
the same with rs_readers
+            RowsetReaderContext reader_context;
+            reader_context.reader_type = READER_ALTER_TABLE;
+            reader_context.tablet_schema = tablet_schema;
+            reader_context.need_ordered_result = true;
+            reader_context.delete_handler = delete_handler;

Review Comment:
   If delete_handler filter rows that's marked deleted, the index will get rows 
not exactly the same as segment data file, and then MATCH query will produce 
wrong result. This need to be tested carefully.



##########
be/src/olap/schema_change.cpp:
##########
@@ -576,6 +583,248 @@ Status 
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
     return Status::OK();
 }
 
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+        const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+        const TabletSchemaSPtr& tablet_schema)
+        : SchemaChange(),
+          _alter_inverted_indexs(alter_inverted_indexs),
+          _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+    VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr 
rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet,
+                                             TabletSchemaSPtr 
base_tablet_schema) {
+    Status res = Status::OK();
+    if (rowset_reader->rowset()->empty() || 
rowset_reader->rowset()->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    std::vector<ColumnId> return_columns;
+    for (auto& inverted_index : _alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = _tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // create inverted index writer
+    auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+    std::string segment_dir = base_tablet->tablet_path();
+    auto fs = rowset_meta->fs();
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        std::string segment_filename =
+                fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), 
i);
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+
+            std::unique_ptr<Field> field(FieldFactory::create(column));
+            _index_metas.emplace_back(new TabletIndex());
+            _index_metas.back()->init_from_thrift(inverted_index, 
*_tablet_schema);
+            std::unique_ptr<segment_v2::InvertedIndexColumnWriter> 
inverted_index_builder;
+            try {
+                RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+                        field.get(), &inverted_index_builder, 
segment_filename, segment_dir,
+                        _index_metas.back().get(), fs));
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+
+            if (inverted_index_builder) {
+                std::string writer_sign = fmt::format("{}_{}", i, index_id);

Review Comment:
   It's more efficient to use pair as key instead of string format. Furter 
more, if the three loops merged into one and process segment one by one, 
index_id can be used as key.



##########
be/src/olap/schema_change.cpp:
##########
@@ -576,6 +583,248 @@ Status 
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
     return Status::OK();
 }
 
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+        const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+        const TabletSchemaSPtr& tablet_schema)
+        : SchemaChange(),
+          _alter_inverted_indexs(alter_inverted_indexs),
+          _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+    VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr 
rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet,
+                                             TabletSchemaSPtr 
base_tablet_schema) {
+    Status res = Status::OK();
+    if (rowset_reader->rowset()->empty() || 
rowset_reader->rowset()->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    std::vector<ColumnId> return_columns;
+    for (auto& inverted_index : _alter_inverted_indexs) {
+        DCHECK_EQ(inverted_index.columns.size(), 1);
+        auto column_name = inverted_index.columns[0];
+        auto idx = _tablet_schema->field_index(column_name);
+        return_columns.emplace_back(idx);
+    }
+
+    // create inverted index writer
+    auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+    std::string segment_dir = base_tablet->tablet_path();
+    auto fs = rowset_meta->fs();
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        std::string segment_filename =
+                fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), 
i);
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+
+            std::unique_ptr<Field> field(FieldFactory::create(column));
+            _index_metas.emplace_back(new TabletIndex());
+            _index_metas.back()->init_from_thrift(inverted_index, 
*_tablet_schema);
+            std::unique_ptr<segment_v2::InvertedIndexColumnWriter> 
inverted_index_builder;
+            try {
+                RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+                        field.get(), &inverted_index_builder, 
segment_filename, segment_dir,
+                        _index_metas.back().get(), fs));
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+
+            if (inverted_index_builder) {
+                std::string writer_sign = fmt::format("{}_{}", i, index_id);
+                _inverted_index_builders.insert(
+                        std::make_pair(writer_sign, 
std::move(inverted_index_builder)));
+            }
+        }
+    }
+
+    SegmentCacheHandle segment_cache_handle;
+    // load segments
+    RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+            std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()), 
&segment_cache_handle,
+            false));
+
+    // create iterator for each segment
+    StorageReadOptions read_options;
+    OlapReaderStatistics stats;
+    read_options.stats = &stats;
+    read_options.tablet_schema = _tablet_schema;
+    std::unique_ptr<Schema> schema =
+            std::make_unique<Schema>(_tablet_schema->columns(), 
return_columns);
+    for (auto& seg_ptr : segment_cache_handle.get_segments()) {
+        std::unique_ptr<RowwiseIterator> iter;
+        res = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!res.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+                         << "]: " << res.to_string();
+            return Status::Error<ROWSET_READER_INIT>();
+        }
+
+        std::shared_ptr<vectorized::Block> block =
+                
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
+        do {
+            block->clear_column_data();
+            res = iter->next_batch(block.get());
+            if (!res.ok()) {
+                if (res.is<END_OF_FILE>()) {
+                    break;
+                }
+                RETURN_NOT_OK_LOG(
+                        res, "failed to read next block when schema change for 
inverted index.");
+            }
+
+            // copy block
+            auto ref_block = *block;
+
+            // write inverted index
+            if (_write_inverted_index(iter->data_id(), &ref_block) != 
Status::OK()) {
+                res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>();
+                LOG(WARNING) << "failed to write block.";
+                return res;
+            }
+        } while (block->rows() != 0);
+    }
+
+    // finish write inverted index, flush data to compound file
+    for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+        for (auto& inverted_index : _alter_inverted_indexs) {
+            DCHECK_EQ(inverted_index.columns.size(), 1);
+            auto column_name = inverted_index.columns[0];
+            auto column = _tablet_schema->column(column_name);
+            auto index_id = inverted_index.index_id;
+            std::string writer_sign = fmt::format("{}_{}", i, index_id);
+            try {
+                if (_inverted_index_builders[writer_sign]) {
+                    _inverted_index_builders[writer_sign]->finish();
+                }
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "CLuceneError occured: " << e.what();
+                return Status::Error<IO_ERROR>();
+            }
+        }
+    }
+    _inverted_index_builders.clear();
+    _index_metas.clear();
+
+    LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows();
+    return res;
+}
+
+Status SchemaChangeForInvertedIndex::_add_nullable(const std::string& 
column_name,
+                                                   const std::string& 
index_writer_sign,
+                                                   Field* field, const 
uint8_t* null_map,
+                                                   const uint8_t** ptr, size_t 
num_rows) {
+    size_t offset = 0;
+    auto next_run_step = [&]() {
+        size_t step = 1;
+        for (auto i = offset + 1; i < num_rows; ++i) {
+            if (null_map[offset] == null_map[i])
+                step++;
+            else
+                break;
+        }
+        return step;
+    };
+
+    try {
+        do {
+            auto step = next_run_step();
+            if (null_map[offset]) {
+                
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_nulls(step));
+                *ptr += field->size() * step;
+            } else {
+                if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+                    DCHECK(field->get_sub_field_count() == 1);
+                    const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                            field->get_sub_field(0)->size(), col_cursor, 
step));
+                } else {
+                    
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                            column_name, *ptr, step));
+                }
+            }
+            offset += step;
+        } while (offset < num_rows);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_add_data(const std::string& column_name,
+                                               const std::string& 
index_writer_sign, Field* field,
+                                               const uint8_t** ptr, size_t 
num_rows) {
+    try {
+        if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
+            DCHECK(field->get_sub_field_count() == 1);
+            const auto* col_cursor = reinterpret_cast<const 
CollectionValue*>(*ptr);
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values(
+                    field->get_sub_field(0)->size(), col_cursor, num_rows));
+        } else {
+            
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
+                    column_name, *ptr, num_rows));
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "CLuceneError occured: " << e.what();
+        return Status::Error<IO_ERROR>();
+    }
+
+    return Status::OK();
+}
+
+Status SchemaChangeForInvertedIndex::_write_inverted_index(int32_t segment_idx,
+                                                           vectorized::Block* 
block) {
+    LOG(INFO) << "begin to write inverted index";
+
+    // converter block data
+    vectorized::OlapBlockDataConvertor olap_data_convertor;

Review Comment:
   It's not efficient to create an olap_data_convertor for each block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to