caiconghui commented on a change in pull request #8387: URL: https://github.com/apache/incubator-doris/pull/8387#discussion_r821481180
########## File path: be/src/olap/schema_change.cpp ########## @@ -1490,133 +1493,131 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe << " base_tablet=" << base_tablet->full_name() << " new_tablet=" << new_tablet->full_name(); - ReadLock base_migration_rlock(base_tablet->get_migration_lock_ptr(), TRY_LOCK); - if (!base_migration_rlock.own_lock()) { + ReadLock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - ReadLock new_migration_rlock(new_tablet->get_migration_lock_ptr(), TRY_LOCK); - if (!new_migration_rlock.own_lock()) { + ReadLock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock); + if (!new_migration_rlock.owns_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - // begin to find deltas to convert from base tablet to new tablet so that - // obtain base tablet and new tablet's push lock and header write lock to prevent loading data - base_tablet->obtain_push_lock(); - new_tablet->obtain_push_lock(); - base_tablet->obtain_header_wrlock(); - new_tablet->obtain_header_wrlock(); - - // check if the tablet has alter task - // if it has alter task, it means it is under old alter process - std::vector<Version> versions_to_be_changed; std::vector<RowsetReaderSharedPtr> rs_readers; // delete handlers for new tablet DeleteHandler delete_handler; std::vector<ColumnId> return_columns; - size_t num_cols = base_tablet->tablet_schema().num_columns(); - return_columns.resize(num_cols); - for (int i = 0; i < num_cols; ++i) { - return_columns[i] = i; - } - // reader_context is stack variables, it's lifetime should keep the same - // with rs_readers - RowsetReaderContext reader_context; - reader_context.reader_type = READER_ALTER_TABLE; - reader_context.tablet_schema = &base_tablet->tablet_schema(); - reader_context.need_ordered_result = true; - reader_context.delete_handler = &delete_handler; - reader_context.return_columns = &return_columns; - // for schema change, seek_columns is the same to return_columns - reader_context.seek_columns = &return_columns; - reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); - - auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" - + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK); - - do { - // get history data to be converted and it will check if there is hold in base tablet - res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to get version to be changed. res=" << res; - break; - } - - // should check the max_version >= request.alter_version, if not the convert is useless - RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); - if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { - LOG(WARNING) << "base tablet's max version=" - << (max_rowset == nullptr ? 0 : max_rowset->end_version()) - << " is less than request version=" << request.alter_version; - res = OLAP_ERR_VERSION_NOT_EXIST; - break; - } - // before calculating version_to_be_changed, - // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) - LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." - << " new_tablet=" << new_tablet->full_name(); - std::vector<RowsetSharedPtr> rowsets_to_delete; - std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; - new_tablet->acquire_version_and_rowsets(&version_rowsets); - for (auto& pair : version_rowsets) { - if (pair.first.second <= max_rowset->end_version()) { - rowsets_to_delete.push_back(pair.second); + // begin to find deltas to convert from base tablet to new tablet so that + // obtain base tablet and new tablet's push lock and header write lock to prevent loading data + base_tablet->obtain_push_lock(); + new_tablet->obtain_push_lock(); + { + WriteLock base_tablet_rdlock(base_tablet->get_header_lock()); + WriteLock new_tablet_rdlock(new_tablet->get_header_lock()); + // check if the tablet has alter task + // if it has alter task, it means it is under old alter process + size_t num_cols = base_tablet->tablet_schema().num_columns(); + return_columns.resize(num_cols); + for (int i = 0; i < num_cols; ++i) { + return_columns[i] = i; + } + + // reader_context is stack variables, it's lifetime should keep the same + // with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = READER_ALTER_TABLE; + reader_context.tablet_schema = &base_tablet->tablet_schema(); + reader_context.need_ordered_result = true; + reader_context.delete_handler = &delete_handler; + reader_context.return_columns = &return_columns; + // for schema change, seek_columns is the same to return_columns + reader_context.seek_columns = &return_columns; + reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); + + auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" + + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK); + + do { + // get history data to be converted and it will check if there is hold in base tablet + res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to get version to be changed. res=" << res; + break; } - } - std::vector<RowsetSharedPtr> empty_vec; - new_tablet->modify_rowsets(empty_vec, rowsets_to_delete); - // inherit cumulative_layer_point from base_tablet - // check if new_tablet.ce_point > base_tablet.ce_point? - new_tablet->set_cumulative_layer_point(-1); - // save tablet meta - new_tablet->save_meta(); - for (auto& rowset : rowsets_to_delete) { - // do not call rowset.remove directly, using gc thread to delete it - StorageEngine::instance()->add_unused_rowset(rowset); - } - // init one delete handler - int32_t end_version = -1; - for (auto& version : versions_to_be_changed) { - if (version.second > end_version) { - end_version = version.second; + // should check the max_version >= request.alter_version, if not the convert is useless + RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); + if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + << " is less than request version=" << request.alter_version; + res = OLAP_ERR_VERSION_NOT_EXIST; + break; + } + // before calculating version_to_be_changed, + // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) + LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." + << " new_tablet=" << new_tablet->full_name(); + std::vector<RowsetSharedPtr> rowsets_to_delete; + std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; + new_tablet->acquire_version_and_rowsets(&version_rowsets); + for (auto& pair : version_rowsets) { + if (pair.first.second <= max_rowset->end_version()) { + rowsets_to_delete.push_back(pair.second); + } + } + std::vector<RowsetSharedPtr> empty_vec; + new_tablet->modify_rowsets(empty_vec, rowsets_to_delete); + // inherit cumulative_layer_point from base_tablet + // check if new_tablet.ce_point > base_tablet.ce_point? + new_tablet->set_cumulative_layer_point(-1); + // save tablet meta + new_tablet->save_meta(); + for (auto& rowset : rowsets_to_delete) { + // do not call rowset.remove directly, using gc thread to delete it + StorageEngine::instance()->add_unused_rowset(rowset); } - } - res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), - end_version); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() - << ", end_version=" << end_version; + // init one delete handler + int32_t end_version = -1; + for (auto& version : versions_to_be_changed) { + if (version.second > end_version) { + end_version = version.second; + } + } - // release delete handlers which have been inited successfully. - delete_handler.finalize(); - break; - } + res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), + end_version); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() + << ", end_version=" << end_version; - // acquire data sources correspond to history versions - base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, mem_tracker); - if (rs_readers.size() < 1) { - LOG(WARNING) << "fail to acquire all data sources. " - << "version_num=" << versions_to_be_changed.size() - << ", data_source_num=" << rs_readers.size(); - res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS; - break; - } + // release delete handlers which have been inited successfully. + delete_handler.finalize(); + break; + } - for (auto& rs_reader : rs_readers) { - res = rs_reader->init(&reader_context); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); + // acquire data sources correspond to history versions + base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, mem_tracker); + if (rs_readers.size() < 1) { + LOG(WARNING) << "fail to acquire all data sources. " + << "version_num=" << versions_to_be_changed.size() + << ", data_source_num=" << rs_readers.size(); + res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS; break; } - } - } while (0); + for (auto& rs_reader : rs_readers) { + res = rs_reader->init(&reader_context); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); + break; + } + } - new_tablet->release_header_lock(); - base_tablet->release_header_lock(); + } while (0); + } new_tablet->release_push_lock(); Review comment: this PR is to modify RWMutex first, and I will modify it in next PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org