yiguolei commented on a change in pull request #8387: URL: https://github.com/apache/incubator-doris/pull/8387#discussion_r821421283
########## File path: be/src/olap/compaction.cpp ########## @@ -23,6 +23,8 @@ #include "util/trace.h" using std::vector; +using ReadLock = std::shared_lock<std::shared_mutex>; +using WriteLock = std::unique_lock<std::shared_mutex>; Review comment: Maybe we could move these declarations to a common header file if you have replace all write lock and read lock implementations. ########## File path: be/src/olap/reader.cpp ########## @@ -819,12 +820,12 @@ OLAPStatus TabletReader::_init_delete_condition(const ReaderParams& read_params) if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) { return OLAP_SUCCESS; } - - _tablet->obtain_header_rdlock(); - OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(), - read_params.version.second, this); - _tablet->release_header_lock(); - + OLAPStatus ret; + { Review comment: why add { here? ########## File path: be/src/olap/tablet_manager.cpp ########## @@ -128,25 +127,26 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s } } - existed_tablet->obtain_header_rdlock(); const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version(); const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version(); - - // If new tablet is empty, it is a newly created schema change tablet. - // the old tablet is dropped before add tablet. it should not exist old tablet - if (new_rowset == nullptr) { - existed_tablet->release_header_lock(); - // it seems useless to call unlock and return here. - // it could prevent error when log level is changed in the future. - LOG(FATAL) << "new tablet is empty and old tablet exists. it should not happen." - << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash; - return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; + int64_t old_time, new_time; + int32_t old_version, new_version; + { + ReadLock rdlock(existed_tablet->get_header_lock()); Review comment: the scope of this lock is not the same as the old one ########## File path: be/src/olap/push_handler.cpp ########## @@ -116,11 +118,12 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP DeletePredicatePB del_pred; DeleteConditionHandler del_cond_handler; - tablet_var.tablet->obtain_header_rdlock(); - res = del_cond_handler.generate_delete_predicate(tablet_var.tablet->tablet_schema(), - request.delete_conditions, &del_pred); - del_preds.push(del_pred); - tablet_var.tablet->release_header_lock(); + { Review comment: why add { here? ########## File path: be/src/olap/schema_change.cpp ########## @@ -1490,133 +1493,131 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe << " base_tablet=" << base_tablet->full_name() << " new_tablet=" << new_tablet->full_name(); - ReadLock base_migration_rlock(base_tablet->get_migration_lock_ptr(), TRY_LOCK); - if (!base_migration_rlock.own_lock()) { + ReadLock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - ReadLock new_migration_rlock(new_tablet->get_migration_lock_ptr(), TRY_LOCK); - if (!new_migration_rlock.own_lock()) { + ReadLock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock); + if (!new_migration_rlock.owns_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - // begin to find deltas to convert from base tablet to new tablet so that - // obtain base tablet and new tablet's push lock and header write lock to prevent loading data - base_tablet->obtain_push_lock(); - new_tablet->obtain_push_lock(); - base_tablet->obtain_header_wrlock(); - new_tablet->obtain_header_wrlock(); - - // check if the tablet has alter task - // if it has alter task, it means it is under old alter process - std::vector<Version> versions_to_be_changed; std::vector<RowsetReaderSharedPtr> rs_readers; // delete handlers for new tablet DeleteHandler delete_handler; std::vector<ColumnId> return_columns; - size_t num_cols = base_tablet->tablet_schema().num_columns(); - return_columns.resize(num_cols); - for (int i = 0; i < num_cols; ++i) { - return_columns[i] = i; - } - // reader_context is stack variables, it's lifetime should keep the same - // with rs_readers - RowsetReaderContext reader_context; - reader_context.reader_type = READER_ALTER_TABLE; - reader_context.tablet_schema = &base_tablet->tablet_schema(); - reader_context.need_ordered_result = true; - reader_context.delete_handler = &delete_handler; - reader_context.return_columns = &return_columns; - // for schema change, seek_columns is the same to return_columns - reader_context.seek_columns = &return_columns; - reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); - - auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" - + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK); - - do { - // get history data to be converted and it will check if there is hold in base tablet - res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to get version to be changed. res=" << res; - break; - } - - // should check the max_version >= request.alter_version, if not the convert is useless - RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); - if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { - LOG(WARNING) << "base tablet's max version=" - << (max_rowset == nullptr ? 0 : max_rowset->end_version()) - << " is less than request version=" << request.alter_version; - res = OLAP_ERR_VERSION_NOT_EXIST; - break; - } - // before calculating version_to_be_changed, - // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) - LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." - << " new_tablet=" << new_tablet->full_name(); - std::vector<RowsetSharedPtr> rowsets_to_delete; - std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; - new_tablet->acquire_version_and_rowsets(&version_rowsets); - for (auto& pair : version_rowsets) { - if (pair.first.second <= max_rowset->end_version()) { - rowsets_to_delete.push_back(pair.second); + // begin to find deltas to convert from base tablet to new tablet so that + // obtain base tablet and new tablet's push lock and header write lock to prevent loading data + base_tablet->obtain_push_lock(); + new_tablet->obtain_push_lock(); + { + WriteLock base_tablet_rdlock(base_tablet->get_header_lock()); + WriteLock new_tablet_rdlock(new_tablet->get_header_lock()); + // check if the tablet has alter task + // if it has alter task, it means it is under old alter process + size_t num_cols = base_tablet->tablet_schema().num_columns(); + return_columns.resize(num_cols); + for (int i = 0; i < num_cols; ++i) { + return_columns[i] = i; + } + + // reader_context is stack variables, it's lifetime should keep the same + // with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = READER_ALTER_TABLE; + reader_context.tablet_schema = &base_tablet->tablet_schema(); + reader_context.need_ordered_result = true; + reader_context.delete_handler = &delete_handler; + reader_context.return_columns = &return_columns; + // for schema change, seek_columns is the same to return_columns + reader_context.seek_columns = &return_columns; + reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); + + auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" + + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK); + + do { + // get history data to be converted and it will check if there is hold in base tablet + res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to get version to be changed. res=" << res; + break; } - } - std::vector<RowsetSharedPtr> empty_vec; - new_tablet->modify_rowsets(empty_vec, rowsets_to_delete); - // inherit cumulative_layer_point from base_tablet - // check if new_tablet.ce_point > base_tablet.ce_point? - new_tablet->set_cumulative_layer_point(-1); - // save tablet meta - new_tablet->save_meta(); - for (auto& rowset : rowsets_to_delete) { - // do not call rowset.remove directly, using gc thread to delete it - StorageEngine::instance()->add_unused_rowset(rowset); - } - // init one delete handler - int32_t end_version = -1; - for (auto& version : versions_to_be_changed) { - if (version.second > end_version) { - end_version = version.second; + // should check the max_version >= request.alter_version, if not the convert is useless + RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); + if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + << " is less than request version=" << request.alter_version; + res = OLAP_ERR_VERSION_NOT_EXIST; + break; + } + // before calculating version_to_be_changed, + // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) + LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." + << " new_tablet=" << new_tablet->full_name(); + std::vector<RowsetSharedPtr> rowsets_to_delete; + std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; + new_tablet->acquire_version_and_rowsets(&version_rowsets); + for (auto& pair : version_rowsets) { + if (pair.first.second <= max_rowset->end_version()) { + rowsets_to_delete.push_back(pair.second); + } + } + std::vector<RowsetSharedPtr> empty_vec; + new_tablet->modify_rowsets(empty_vec, rowsets_to_delete); + // inherit cumulative_layer_point from base_tablet + // check if new_tablet.ce_point > base_tablet.ce_point? + new_tablet->set_cumulative_layer_point(-1); + // save tablet meta + new_tablet->save_meta(); + for (auto& rowset : rowsets_to_delete) { + // do not call rowset.remove directly, using gc thread to delete it + StorageEngine::instance()->add_unused_rowset(rowset); } - } - res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), - end_version); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() - << ", end_version=" << end_version; + // init one delete handler + int32_t end_version = -1; + for (auto& version : versions_to_be_changed) { + if (version.second > end_version) { + end_version = version.second; + } + } - // release delete handlers which have been inited successfully. - delete_handler.finalize(); - break; - } + res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), + end_version); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() + << ", end_version=" << end_version; - // acquire data sources correspond to history versions - base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, mem_tracker); - if (rs_readers.size() < 1) { - LOG(WARNING) << "fail to acquire all data sources. " - << "version_num=" << versions_to_be_changed.size() - << ", data_source_num=" << rs_readers.size(); - res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS; - break; - } + // release delete handlers which have been inited successfully. + delete_handler.finalize(); + break; + } - for (auto& rs_reader : rs_readers) { - res = rs_reader->init(&reader_context); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); + // acquire data sources correspond to history versions + base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, mem_tracker); + if (rs_readers.size() < 1) { + LOG(WARNING) << "fail to acquire all data sources. " + << "version_num=" << versions_to_be_changed.size() + << ", data_source_num=" << rs_readers.size(); + res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS; break; } - } - } while (0); + for (auto& rs_reader : rs_readers) { + res = rs_reader->init(&reader_context); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); + break; + } + } - new_tablet->release_header_lock(); - base_tablet->release_header_lock(); + } while (0); + } new_tablet->release_push_lock(); Review comment: I think we could reference clickhouse's scope_safe_guard.h, then we could use RAII mode to manage the lock's lock or unlock. And will not forget to unlock. There are many code like this in doris: acquire lock xxxx do something xxxx release lock xxxx ########## File path: be/src/olap/tablet_manager.cpp ########## @@ -128,25 +127,26 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s } } - existed_tablet->obtain_header_rdlock(); const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version(); const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version(); - - // If new tablet is empty, it is a newly created schema change tablet. - // the old tablet is dropped before add tablet. it should not exist old tablet - if (new_rowset == nullptr) { - existed_tablet->release_header_lock(); - // it seems useless to call unlock and return here. - // it could prevent error when log level is changed in the future. - LOG(FATAL) << "new tablet is empty and old tablet exists. it should not happen." - << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash; - return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE; + int64_t old_time, new_time; + int32_t old_version, new_version; + { + ReadLock rdlock(existed_tablet->get_header_lock()); Review comment: If we use scope guard, lots of code will not be changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org