chaoyli commented on a change in pull request #563: Modify schema change to use rowset URL: https://github.com/apache/incubator-doris/pull/563#discussion_r249622581
########## File path: be/src/olap/schema_change.cpp ########## @@ -1756,97 +1638,99 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( } // c. 转换数据 - ColumnData* olap_data = NULL; - for (vector<SegmentGroup*>::iterator it = ref_segment_groups->begin(); - it != ref_segment_groups->end(); ++it) { - ColumnData* olap_data = ColumnData::create(*it); - if (NULL == olap_data) { - OLAP_LOG_WARNING("fail to create ColumnData."); - res = OLAP_ERR_MALLOC_ERROR; - goto SCHEMA_VERSION_CONVERT_ERR; - } - - olap_data->init(); - - SegmentGroup* new_segment_group = nullptr; - if ((*it)->transaction_id() == 0) { - new_segment_group = new SegmentGroup(dest_tablet->tablet_id(), - 0, - dest_tablet->tablet_schema(), - dest_tablet->num_key_fields(), - dest_tablet->num_short_key_fields(), - dest_tablet->num_rows_per_row_block(), - dest_tablet->rowset_path_prefix(), - olap_data->version(), - olap_data->version_hash(), - olap_data->delete_flag(), - (*it)->segment_group_id(), 0); + for (vector<RowsetSharedPtr>::iterator it = ref_rowsets->begin(); + it != ref_rowsets->end(); ++it) { + RowsetReaderSharedPtr rowset_reader = (*it)->create_reader(); + rowset_reader->init(nullptr); + + RowsetBuilderContextBuilder context_builder; + RowsetId rowset_id = 0; + RowsetIdGenerator::instance()->get_next_id(dest_tablet->data_dir(), &rowset_id); + if ((*it)->is_pending()) { + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + context_builder.set_rowset_id(rowset_id) + .set_tablet_id(dest_tablet->tablet_id()) + .set_partition_id(dest_tablet->partition_id()) + .set_tablet_schema_hash(dest_tablet->schema_hash()) + .set_rowset_type(ALPHA_ROWSET) + .set_rowset_path_prefix(dest_tablet->tablet_path()) + .set_tablet_schema(dest_tablet->tablet_schema()) + .set_num_key_fields(dest_tablet->num_key_fields()) + .set_num_short_key_fields(dest_tablet->num_short_key_fields()) + .set_num_rows_per_row_block(dest_tablet->num_rows_per_row_block()) + .set_compress_kind(dest_tablet->compress_kind()) + .set_bloom_filter_fpp(dest_tablet->bloom_filter_fpp()) + .set_rowset_state(PREPARING) + .set_txn_id((*it)->txn_id()) + .set_load_id(load_id); } else { - new_segment_group = new SegmentGroup(dest_tablet->tablet_id(), - 0, - dest_tablet->tablet_schema(), - dest_tablet->num_key_fields(), - dest_tablet->num_short_key_fields(), - dest_tablet->num_rows_per_row_block(), - dest_tablet->rowset_path_prefix(), - olap_data->delete_flag(), - (*it)->segment_group_id(), 0, - (*it)->is_pending(), - (*it)->partition_id(), - (*it)->transaction_id()); - } - - if (NULL == new_segment_group) { - LOG(FATAL) << "failed to malloc SegmentGroup. size=" << sizeof(SegmentGroup); - res = OLAP_ERR_MALLOC_ERROR; - goto SCHEMA_VERSION_CONVERT_ERR; - } - - new_segment_groups->push_back(new_segment_group); - - if (!sc_procedure->process(olap_data, new_segment_group, dest_tablet)) { + context_builder.set_rowset_id(rowset_id) + .set_tablet_id(dest_tablet->tablet_id()) + .set_partition_id(dest_tablet->partition_id()) + .set_tablet_schema_hash(dest_tablet->schema_hash()) + .set_rowset_type(ALPHA_ROWSET) + .set_rowset_path_prefix(dest_tablet->tablet_path()) + .set_tablet_schema(dest_tablet->tablet_schema()) + .set_num_key_fields(dest_tablet->num_key_fields()) + .set_num_short_key_fields(dest_tablet->num_short_key_fields()) + .set_num_rows_per_row_block(dest_tablet->num_rows_per_row_block()) + .set_compress_kind(dest_tablet->compress_kind()) + .set_bloom_filter_fpp(dest_tablet->bloom_filter_fpp()) + .set_rowset_state(VISIBLE) + .set_version((*it)->version()) + .set_version_hash((*it)->version_hash()); + } + RowsetBuilderContext context = context_builder.build(); + RowsetBuilderSharedPtr rowset_builder(new AlphaRowsetBuilder()); + rowset_builder->init(context); + + if (!sc_procedure->process(rowset_reader, rowset_builder, dest_tablet)) { if ((*it)->is_pending()) { - OLAP_LOG_WARNING("failed to process the transaction when schema change. " - "[tablet='%s' transaction=%ld]", - dest_tablet->full_name().c_str(), - (*it)->transaction_id()); + LOG(WARNING) << "failed to process the transaction when schema change. " + << "[tablet='" << dest_tablet->full_name() << "'" + << " transaction="<< (*it)->txn_id() << "]"; } else { - OLAP_LOG_WARNING("failed to process the version. [version='%d-%d']", - (*it)->version().first, - (*it)->version().second); + LOG(WARNING) << "failed to process the version. " + << "[version='" << (*it)->version().first + << "-" << (*it)->version().second << "']"; } + rowset_builder->release(); res = OLAP_ERR_INPUT_PARAMETER_ERROR; goto SCHEMA_VERSION_CONVERT_ERR; } - - SAFE_DELETE(olap_data); + RowsetSharedPtr new_rowset = rowset_builder->build(); + if (new_rowset == nullptr) { + LOG(WARNING) << "build rowset failed."; + res = OLAP_ERR_MALLOC_ERROR; + goto SCHEMA_VERSION_CONVERT_ERR; + } + new_rowsets->push_back(new_rowset); } SAFE_DELETE(sc_procedure); - SAFE_DELETE(olap_data); return res; SCHEMA_VERSION_CONVERT_ERR: - while (!new_segment_groups->empty()) { - SegmentGroup* segment_group = new_segment_groups->back(); - segment_group->delete_all_files(); - SAFE_DELETE(segment_group); - new_segment_groups->pop_back(); + while (!new_rowsets->empty()) { + RowsetSharedPtr new_rowset = new_rowsets->back(); + new_rowset->remove(); + new_rowsets->pop_back(); } SAFE_DELETE(sc_procedure); - SAFE_DELETE(olap_data); return res; } OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed( TabletSharedPtr ref_tablet, vector<Version>& versions_to_be_changed) { int32_t request_version = 0; - const PDelta* lastest_version = ref_tablet->lastest_version(); - if (lastest_version != NULL) { - request_version = lastest_version->end_version() - 1; + RowsetSharedPtr rowset = ref_tablet->rowset_with_max_version(); + if (rowset != NULL) { + request_version = rowset->version().second - 1; } else { OLAP_LOG_WARNING("Table has no version. [path='%s']", Review comment: LOG(WARNING) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org