xiaokang commented on code in PR #16371: URL: https://github.com/apache/doris/pull/16371#discussion_r1095475224
########## be/src/olap/schema_change.cpp: ########## @@ -958,6 +1232,270 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& return res; } +Status SchemaChangeHandler::_do_process_alter_inverted_index( + TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) { + Status res = Status::OK(); + // TODO(wy): check whether the tablet's max continuous version == request.version + if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() == TABLET_STOPPED || + tablet->tablet_state() == TABLET_SHUTDOWN) { + LOG(WARNING) << "tablet's state=" << tablet->tablet_state() + << " cannot alter inverted index"; + return Status::Error<ErrorCode::INTERNAL_ERROR>(); + } + + std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { + return Status::Error<TRY_LOCK_FAILED>(); + } + + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->copy_from(*tablet->tablet_schema()); + if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) { + tablet_schema->clear_columns(); + for (const auto& column : request.columns) { + tablet_schema->append_column(TabletColumn(column)); + } + } + + // get rowset reader + std::vector<RowsetReaderSharedPtr> rs_readers; + DeleteHandler delete_handler; + RETURN_IF_ERROR( + _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, &delete_handler)); + if (request.__isset.is_drop_op && request.is_drop_op) { + // drop index + res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request); + } else { + // add index + res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, tablet, request); + } + + if (!res.ok()) { + LOG(WARNING) << "failed to alter tablet. tablet=" << tablet->full_name(); + return res; + } + + return Status::OK(); +} + +Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet, + const TabletSchemaSPtr& tablet_schema, + const TAlterInvertedIndexReq& request, + std::vector<RowsetReaderSharedPtr>* rs_readers, + DeleteHandler* delete_handler) { + Status res = Status::OK(); + std::vector<Version> versions_to_be_changed; + std::vector<ColumnId> return_columns; + std::vector<TOlapTableIndex> alter_inverted_indexs; + + if (request.__isset.alter_inverted_indexes) { + alter_inverted_indexs = request.alter_inverted_indexes; + } + + for (auto& inverted_index : alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // obtain base tablet's push lock and header write lock to prevent loading data + { + std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock()); + std::lock_guard<std::shared_mutex> tablet_wlock(tablet->get_header_lock()); + + do { Review Comment: is there any function can be reused here to create rs_readers? ########## be/src/olap/task/engine_alter_tablet_task.cpp: ########## @@ -45,4 +45,34 @@ Status EngineAlterTabletTask::execute() { return res; } // execute +EngineAlterInvertedIndexTask::EngineAlterInvertedIndexTask( + const TAlterInvertedIndexReq& alter_inverted_index_request) + : _alter_inverted_index_req(alter_inverted_index_request) { + _mem_tracker = std::make_shared<MemTrackerLimiter>( + MemTrackerLimiter::Type::SCHEMA_CHANGE, + fmt::format("EngineAlterInvertedIndexTask#tabletId={}", + std::to_string(_alter_inverted_index_req.tablet_id)), + config::memory_limitation_per_thread_for_schema_change_bytes); +} + +Status EngineAlterInvertedIndexTask::execute() { + SCOPED_ATTACH_TASK(_mem_tracker); + DorisMetrics::instance()->create_rollup_requests_total->increment(1); Review Comment: metric name is not suitable ########## be/src/olap/schema_change.cpp: ########## @@ -958,6 +1232,270 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& return res; } +Status SchemaChangeHandler::_do_process_alter_inverted_index( + TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) { + Status res = Status::OK(); + // TODO(wy): check whether the tablet's max continuous version == request.version + if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() == TABLET_STOPPED || + tablet->tablet_state() == TABLET_SHUTDOWN) { + LOG(WARNING) << "tablet's state=" << tablet->tablet_state() + << " cannot alter inverted index"; + return Status::Error<ErrorCode::INTERNAL_ERROR>(); + } + + std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { + return Status::Error<TRY_LOCK_FAILED>(); + } + + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->copy_from(*tablet->tablet_schema()); + if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) { + tablet_schema->clear_columns(); + for (const auto& column : request.columns) { + tablet_schema->append_column(TabletColumn(column)); + } + } + + // get rowset reader + std::vector<RowsetReaderSharedPtr> rs_readers; + DeleteHandler delete_handler; + RETURN_IF_ERROR( + _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, &delete_handler)); + if (request.__isset.is_drop_op && request.is_drop_op) { + // drop index + res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request); + } else { + // add index + res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, tablet, request); + } + + if (!res.ok()) { + LOG(WARNING) << "failed to alter tablet. tablet=" << tablet->full_name(); + return res; + } + + return Status::OK(); +} + +Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet, + const TabletSchemaSPtr& tablet_schema, + const TAlterInvertedIndexReq& request, + std::vector<RowsetReaderSharedPtr>* rs_readers, + DeleteHandler* delete_handler) { + Status res = Status::OK(); + std::vector<Version> versions_to_be_changed; + std::vector<ColumnId> return_columns; + std::vector<TOlapTableIndex> alter_inverted_indexs; + + if (request.__isset.alter_inverted_indexes) { + alter_inverted_indexs = request.alter_inverted_indexes; + } + + for (auto& inverted_index : alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // obtain base tablet's push lock and header write lock to prevent loading data + { + std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock()); + std::lock_guard<std::shared_mutex> tablet_wlock(tablet->get_header_lock()); + + do { + RowsetSharedPtr max_rowset; + // get history data to rebuild inverted index and it will check if there is hold in base tablet + res = _get_versions_to_be_changed(tablet, &versions_to_be_changed, &max_rowset); + if (!res.ok()) { + LOG(WARNING) << "fail to get version to be rebuild inverted index. res=" << res; + break; + } + + // should check the max_version >= request.alter_version, if not the rebuild index is useless + if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + << " is less than request version=" << request.alter_version; + res = Status::InternalError( + "base tablet's max version={} is less than request version={}", + (max_rowset == nullptr ? 0 : max_rowset->end_version()), + request.alter_version); + break; + } + + // init one delete handler + int64_t end_version = -1; + for (auto& version : versions_to_be_changed) { + end_version = std::max(end_version, version.second); + } + + auto& all_del_preds = tablet->delete_predicates(); + for (auto& delete_pred : all_del_preds) { + if (delete_pred->version().first > end_version) { + continue; + } + tablet_schema->merge_dropped_columns(tablet->tablet_schema(delete_pred->version())); + } + res = delete_handler->init(tablet_schema, all_del_preds, end_version); + if (!res) { + LOG(WARNING) << "init delete handler failed. tablet=" << tablet->full_name() + << ", end_version=" << end_version; + break; + } + + // acquire data sources correspond to history versions + tablet->capture_rs_readers(versions_to_be_changed, rs_readers); + if (rs_readers->size() < 1) { + LOG(WARNING) << "fail to acquire all data sources. " + << "version_num=" << versions_to_be_changed.size() + << ", data_source_num=" << rs_readers->size(); + res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(); + break; + } + + // reader_context is stack variables, it's lifetime should keep the same with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = READER_ALTER_TABLE; + reader_context.tablet_schema = tablet_schema; + reader_context.need_ordered_result = true; Review Comment: index should keep consistent with segment data file row by row. So is it right to set need_ordered_result = true? ########## be/src/olap/schema_change.cpp: ########## @@ -576,6 +583,248 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_ return Status::OK(); } +SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex( + const std::vector<TOlapTableIndex>& alter_inverted_indexs, + const TabletSchemaSPtr& tablet_schema) + : SchemaChange(), + _alter_inverted_indexs(alter_inverted_indexs), + _tablet_schema(tablet_schema) {} + +SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() { + VLOG_NOTICE << "~SchemaChangeForInvertedIndex()"; + _inverted_index_builders.clear(); + _index_metas.clear(); +} + +Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet, + TabletSchemaSPtr base_tablet_schema) { + Status res = Status::OK(); + if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { + return Status::OK(); + } + + std::vector<ColumnId> return_columns; + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = _tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // create inverted index writer + auto rowset_meta = rowset_reader->rowset()->rowset_meta(); + std::string segment_dir = base_tablet->tablet_path(); + auto fs = rowset_meta->fs(); + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { Review Comment: The three loops over segments can be merged to one. And there will be less opened _inverted_index_builders. ########## be/src/olap/schema_change.cpp: ########## @@ -576,6 +583,248 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_ return Status::OK(); } +SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex( + const std::vector<TOlapTableIndex>& alter_inverted_indexs, + const TabletSchemaSPtr& tablet_schema) + : SchemaChange(), + _alter_inverted_indexs(alter_inverted_indexs), + _tablet_schema(tablet_schema) {} + +SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() { + VLOG_NOTICE << "~SchemaChangeForInvertedIndex()"; + _inverted_index_builders.clear(); + _index_metas.clear(); +} + +Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet, + TabletSchemaSPtr base_tablet_schema) { + Status res = Status::OK(); + if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { + return Status::OK(); + } + + std::vector<ColumnId> return_columns; + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = _tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // create inverted index writer + auto rowset_meta = rowset_reader->rowset()->rowset_meta(); + std::string segment_dir = base_tablet->tablet_path(); + auto fs = rowset_meta->fs(); + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { + std::string segment_filename = + fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), i); + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto column = _tablet_schema->column(column_name); + auto index_id = inverted_index.index_id; + + std::unique_ptr<Field> field(FieldFactory::create(column)); + _index_metas.emplace_back(new TabletIndex()); + _index_metas.back()->init_from_thrift(inverted_index, *_tablet_schema); + std::unique_ptr<segment_v2::InvertedIndexColumnWriter> inverted_index_builder; + try { + RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create( + field.get(), &inverted_index_builder, segment_filename, segment_dir, + _index_metas.back().get(), fs)); + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + if (inverted_index_builder) { + std::string writer_sign = fmt::format("{}_{}", i, index_id); + _inverted_index_builders.insert( + std::make_pair(writer_sign, std::move(inverted_index_builder))); + } + } + } + + SegmentCacheHandle segment_cache_handle; + // load segments + RETURN_NOT_OK(SegmentLoader::instance()->load_segments( + std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()), &segment_cache_handle, + false)); + + // create iterator for each segment + StorageReadOptions read_options; + OlapReaderStatistics stats; + read_options.stats = &stats; + read_options.tablet_schema = _tablet_schema; + std::unique_ptr<Schema> schema = + std::make_unique<Schema>(_tablet_schema->columns(), return_columns); + for (auto& seg_ptr : segment_cache_handle.get_segments()) { + std::unique_ptr<RowwiseIterator> iter; + res = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!res.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() + << "]: " << res.to_string(); + return Status::Error<ROWSET_READER_INIT>(); + } + + std::shared_ptr<vectorized::Block> block = + std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns)); + do { + block->clear_column_data(); + res = iter->next_batch(block.get()); + if (!res.ok()) { + if (res.is<END_OF_FILE>()) { + break; + } + RETURN_NOT_OK_LOG( + res, "failed to read next block when schema change for inverted index."); + } + + // copy block + auto ref_block = *block; + + // write inverted index + if (_write_inverted_index(iter->data_id(), &ref_block) != Status::OK()) { + res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>(); + LOG(WARNING) << "failed to write block."; + return res; + } + } while (block->rows() != 0); + } + + // finish write inverted index, flush data to compound file + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto column = _tablet_schema->column(column_name); + auto index_id = inverted_index.index_id; + std::string writer_sign = fmt::format("{}_{}", i, index_id); + try { + if (_inverted_index_builders[writer_sign]) { + _inverted_index_builders[writer_sign]->finish(); + } + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + } + } + _inverted_index_builders.clear(); + _index_metas.clear(); + + LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows(); + return res; +} + +Status SchemaChangeForInvertedIndex::_add_nullable(const std::string& column_name, + const std::string& index_writer_sign, + Field* field, const uint8_t* null_map, + const uint8_t** ptr, size_t num_rows) { + size_t offset = 0; + auto next_run_step = [&]() { + size_t step = 1; + for (auto i = offset + 1; i < num_rows; ++i) { + if (null_map[offset] == null_map[i]) + step++; + else + break; + } + return step; + }; + + try { + do { + auto step = next_run_step(); + if (null_map[offset]) { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_nulls(step)); + *ptr += field->size() * step; + } else { + if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { + DCHECK(field->get_sub_field_count() == 1); + const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values( + field->get_sub_field(0)->size(), col_cursor, step)); + } else { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values( + column_name, *ptr, step)); + } + } + offset += step; + } while (offset < num_rows); + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + return Status::OK(); +} + +Status SchemaChangeForInvertedIndex::_add_data(const std::string& column_name, + const std::string& index_writer_sign, Field* field, + const uint8_t** ptr, size_t num_rows) { + try { + if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { + DCHECK(field->get_sub_field_count() == 1); + const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values( + field->get_sub_field(0)->size(), col_cursor, num_rows)); + } else { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values( + column_name, *ptr, num_rows)); + } + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + return Status::OK(); +} + +Status SchemaChangeForInvertedIndex::_write_inverted_index(int32_t segment_idx, + vectorized::Block* block) { + LOG(INFO) << "begin to write inverted index"; Review Comment: verbose debug log ########## be/src/vec/olap/olap_data_convertor.h: ########## @@ -46,13 +46,16 @@ class IOlapColumnDataAccessor { class OlapBlockDataConvertor { public: + OlapBlockDataConvertor() = default; Review Comment: why not use OlapBlockDataConvertor(const TabletSchema* tablet_schema)? ########## be/src/olap/schema_change.cpp: ########## @@ -958,6 +1232,270 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& return res; } +Status SchemaChangeHandler::_do_process_alter_inverted_index( + TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) { + Status res = Status::OK(); + // TODO(wy): check whether the tablet's max continuous version == request.version + if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() == TABLET_STOPPED || + tablet->tablet_state() == TABLET_SHUTDOWN) { + LOG(WARNING) << "tablet's state=" << tablet->tablet_state() + << " cannot alter inverted index"; + return Status::Error<ErrorCode::INTERNAL_ERROR>(); + } + + std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { + return Status::Error<TRY_LOCK_FAILED>(); + } + + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->copy_from(*tablet->tablet_schema()); + if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) { + tablet_schema->clear_columns(); + for (const auto& column : request.columns) { + tablet_schema->append_column(TabletColumn(column)); + } + } + + // get rowset reader + std::vector<RowsetReaderSharedPtr> rs_readers; + DeleteHandler delete_handler; + RETURN_IF_ERROR( + _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, &delete_handler)); + if (request.__isset.is_drop_op && request.is_drop_op) { + // drop index + res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request); + } else { + // add index + res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, tablet, request); + } + + if (!res.ok()) { + LOG(WARNING) << "failed to alter tablet. tablet=" << tablet->full_name(); + return res; + } + + return Status::OK(); +} + +Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet, + const TabletSchemaSPtr& tablet_schema, + const TAlterInvertedIndexReq& request, + std::vector<RowsetReaderSharedPtr>* rs_readers, + DeleteHandler* delete_handler) { + Status res = Status::OK(); + std::vector<Version> versions_to_be_changed; + std::vector<ColumnId> return_columns; + std::vector<TOlapTableIndex> alter_inverted_indexs; + + if (request.__isset.alter_inverted_indexes) { + alter_inverted_indexs = request.alter_inverted_indexes; + } + + for (auto& inverted_index : alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // obtain base tablet's push lock and header write lock to prevent loading data + { + std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock()); + std::lock_guard<std::shared_mutex> tablet_wlock(tablet->get_header_lock()); + + do { + RowsetSharedPtr max_rowset; + // get history data to rebuild inverted index and it will check if there is hold in base tablet + res = _get_versions_to_be_changed(tablet, &versions_to_be_changed, &max_rowset); + if (!res.ok()) { + LOG(WARNING) << "fail to get version to be rebuild inverted index. res=" << res; + break; + } + + // should check the max_version >= request.alter_version, if not the rebuild index is useless + if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + << " is less than request version=" << request.alter_version; + res = Status::InternalError( + "base tablet's max version={} is less than request version={}", + (max_rowset == nullptr ? 0 : max_rowset->end_version()), + request.alter_version); + break; + } + + // init one delete handler + int64_t end_version = -1; + for (auto& version : versions_to_be_changed) { + end_version = std::max(end_version, version.second); + } + + auto& all_del_preds = tablet->delete_predicates(); + for (auto& delete_pred : all_del_preds) { + if (delete_pred->version().first > end_version) { + continue; + } + tablet_schema->merge_dropped_columns(tablet->tablet_schema(delete_pred->version())); + } + res = delete_handler->init(tablet_schema, all_del_preds, end_version); + if (!res) { + LOG(WARNING) << "init delete handler failed. tablet=" << tablet->full_name() + << ", end_version=" << end_version; + break; + } + + // acquire data sources correspond to history versions + tablet->capture_rs_readers(versions_to_be_changed, rs_readers); + if (rs_readers->size() < 1) { + LOG(WARNING) << "fail to acquire all data sources. " + << "version_num=" << versions_to_be_changed.size() + << ", data_source_num=" << rs_readers->size(); + res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(); + break; + } + + // reader_context is stack variables, it's lifetime should keep the same with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = READER_ALTER_TABLE; + reader_context.tablet_schema = tablet_schema; + reader_context.need_ordered_result = true; + reader_context.delete_handler = delete_handler; Review Comment: If delete_handler filter rows that's marked deleted, the index will get rows not exactly the same as segment data file, and then MATCH query will produce wrong result. This need to be tested carefully. ########## be/src/olap/schema_change.cpp: ########## @@ -576,6 +583,248 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_ return Status::OK(); } +SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex( + const std::vector<TOlapTableIndex>& alter_inverted_indexs, + const TabletSchemaSPtr& tablet_schema) + : SchemaChange(), + _alter_inverted_indexs(alter_inverted_indexs), + _tablet_schema(tablet_schema) {} + +SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() { + VLOG_NOTICE << "~SchemaChangeForInvertedIndex()"; + _inverted_index_builders.clear(); + _index_metas.clear(); +} + +Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet, + TabletSchemaSPtr base_tablet_schema) { + Status res = Status::OK(); + if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { + return Status::OK(); + } + + std::vector<ColumnId> return_columns; + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = _tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // create inverted index writer + auto rowset_meta = rowset_reader->rowset()->rowset_meta(); + std::string segment_dir = base_tablet->tablet_path(); + auto fs = rowset_meta->fs(); + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { + std::string segment_filename = + fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), i); + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto column = _tablet_schema->column(column_name); + auto index_id = inverted_index.index_id; + + std::unique_ptr<Field> field(FieldFactory::create(column)); + _index_metas.emplace_back(new TabletIndex()); + _index_metas.back()->init_from_thrift(inverted_index, *_tablet_schema); + std::unique_ptr<segment_v2::InvertedIndexColumnWriter> inverted_index_builder; + try { + RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create( + field.get(), &inverted_index_builder, segment_filename, segment_dir, + _index_metas.back().get(), fs)); + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + if (inverted_index_builder) { + std::string writer_sign = fmt::format("{}_{}", i, index_id); Review Comment: It's more efficient to use pair as key instead of string format. Furter more, if the three loops merged into one and process segment one by one, index_id can be used as key. ########## be/src/olap/schema_change.cpp: ########## @@ -576,6 +583,248 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_ return Status::OK(); } +SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex( + const std::vector<TOlapTableIndex>& alter_inverted_indexs, + const TabletSchemaSPtr& tablet_schema) + : SchemaChange(), + _alter_inverted_indexs(alter_inverted_indexs), + _tablet_schema(tablet_schema) {} + +SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() { + VLOG_NOTICE << "~SchemaChangeForInvertedIndex()"; + _inverted_index_builders.clear(); + _index_metas.clear(); +} + +Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet, + TabletSchemaSPtr base_tablet_schema) { + Status res = Status::OK(); + if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { + return Status::OK(); + } + + std::vector<ColumnId> return_columns; + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = _tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // create inverted index writer + auto rowset_meta = rowset_reader->rowset()->rowset_meta(); + std::string segment_dir = base_tablet->tablet_path(); + auto fs = rowset_meta->fs(); + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { + std::string segment_filename = + fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(), i); + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto column = _tablet_schema->column(column_name); + auto index_id = inverted_index.index_id; + + std::unique_ptr<Field> field(FieldFactory::create(column)); + _index_metas.emplace_back(new TabletIndex()); + _index_metas.back()->init_from_thrift(inverted_index, *_tablet_schema); + std::unique_ptr<segment_v2::InvertedIndexColumnWriter> inverted_index_builder; + try { + RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create( + field.get(), &inverted_index_builder, segment_filename, segment_dir, + _index_metas.back().get(), fs)); + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + if (inverted_index_builder) { + std::string writer_sign = fmt::format("{}_{}", i, index_id); + _inverted_index_builders.insert( + std::make_pair(writer_sign, std::move(inverted_index_builder))); + } + } + } + + SegmentCacheHandle segment_cache_handle; + // load segments + RETURN_NOT_OK(SegmentLoader::instance()->load_segments( + std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()), &segment_cache_handle, + false)); + + // create iterator for each segment + StorageReadOptions read_options; + OlapReaderStatistics stats; + read_options.stats = &stats; + read_options.tablet_schema = _tablet_schema; + std::unique_ptr<Schema> schema = + std::make_unique<Schema>(_tablet_schema->columns(), return_columns); + for (auto& seg_ptr : segment_cache_handle.get_segments()) { + std::unique_ptr<RowwiseIterator> iter; + res = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!res.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() + << "]: " << res.to_string(); + return Status::Error<ROWSET_READER_INIT>(); + } + + std::shared_ptr<vectorized::Block> block = + std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns)); + do { + block->clear_column_data(); + res = iter->next_batch(block.get()); + if (!res.ok()) { + if (res.is<END_OF_FILE>()) { + break; + } + RETURN_NOT_OK_LOG( + res, "failed to read next block when schema change for inverted index."); + } + + // copy block + auto ref_block = *block; + + // write inverted index + if (_write_inverted_index(iter->data_id(), &ref_block) != Status::OK()) { + res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>(); + LOG(WARNING) << "failed to write block."; + return res; + } + } while (block->rows() != 0); + } + + // finish write inverted index, flush data to compound file + for (auto i = 0; i < rowset_meta->num_segments(); ++i) { + for (auto& inverted_index : _alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto column = _tablet_schema->column(column_name); + auto index_id = inverted_index.index_id; + std::string writer_sign = fmt::format("{}_{}", i, index_id); + try { + if (_inverted_index_builders[writer_sign]) { + _inverted_index_builders[writer_sign]->finish(); + } + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + } + } + _inverted_index_builders.clear(); + _index_metas.clear(); + + LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows(); + return res; +} + +Status SchemaChangeForInvertedIndex::_add_nullable(const std::string& column_name, + const std::string& index_writer_sign, + Field* field, const uint8_t* null_map, + const uint8_t** ptr, size_t num_rows) { + size_t offset = 0; + auto next_run_step = [&]() { + size_t step = 1; + for (auto i = offset + 1; i < num_rows; ++i) { + if (null_map[offset] == null_map[i]) + step++; + else + break; + } + return step; + }; + + try { + do { + auto step = next_run_step(); + if (null_map[offset]) { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_nulls(step)); + *ptr += field->size() * step; + } else { + if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { + DCHECK(field->get_sub_field_count() == 1); + const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values( + field->get_sub_field(0)->size(), col_cursor, step)); + } else { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values( + column_name, *ptr, step)); + } + } + offset += step; + } while (offset < num_rows); + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + return Status::OK(); +} + +Status SchemaChangeForInvertedIndex::_add_data(const std::string& column_name, + const std::string& index_writer_sign, Field* field, + const uint8_t** ptr, size_t num_rows) { + try { + if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { + DCHECK(field->get_sub_field_count() == 1); + const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_array_values( + field->get_sub_field(0)->size(), col_cursor, num_rows)); + } else { + RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values( + column_name, *ptr, num_rows)); + } + } catch (const std::exception& e) { + LOG(WARNING) << "CLuceneError occured: " << e.what(); + return Status::Error<IO_ERROR>(); + } + + return Status::OK(); +} + +Status SchemaChangeForInvertedIndex::_write_inverted_index(int32_t segment_idx, + vectorized::Block* block) { + LOG(INFO) << "begin to write inverted index"; + + // converter block data + vectorized::OlapBlockDataConvertor olap_data_convertor; Review Comment: It's not efficient to create an olap_data_convertor for each block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org