yiguolei commented on a change in pull request #8387:
URL: https://github.com/apache/incubator-doris/pull/8387#discussion_r821421283



##########
File path: be/src/olap/compaction.cpp
##########
@@ -23,6 +23,8 @@
 #include "util/trace.h"
 
 using std::vector;
+using ReadLock = std::shared_lock<std::shared_mutex>;
+using WriteLock = std::unique_lock<std::shared_mutex>;

Review comment:
       Maybe we could move these declarations to a common header file if you 
have replace all write lock and read lock implementations.

##########
File path: be/src/olap/reader.cpp
##########
@@ -819,12 +820,12 @@ OLAPStatus TabletReader::_init_delete_condition(const 
ReaderParams& read_params)
     if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
         return OLAP_SUCCESS;
     }
-
-    _tablet->obtain_header_rdlock();
-    OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(), 
_tablet->delete_predicates(),
-                                          read_params.version.second, this);
-    _tablet->release_header_lock();
-
+    OLAPStatus ret;
+    {

Review comment:
       why add { here?

##########
File path: be/src/olap/tablet_manager.cpp
##########
@@ -128,25 +127,26 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId 
tablet_id, SchemaHash s
         }
     }
 
-    existed_tablet->obtain_header_rdlock();
     const RowsetSharedPtr old_rowset = 
existed_tablet->rowset_with_max_version();
     const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
-
-    // If new tablet is empty, it is a newly created schema change tablet.
-    // the old tablet is dropped before add tablet. it should not exist old 
tablet
-    if (new_rowset == nullptr) {
-        existed_tablet->release_header_lock();
-        // it seems useless to call unlock and return here.
-        // it could prevent error when log level is changed in the future.
-        LOG(FATAL) << "new tablet is empty and old tablet exists. it should 
not happen."
-                   << " tablet_id=" << tablet_id << " schema_hash=" << 
schema_hash;
-        return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
+    int64_t old_time, new_time;
+    int32_t old_version, new_version;
+    {
+        ReadLock rdlock(existed_tablet->get_header_lock());

Review comment:
       the scope of this lock is not the same as the old one

##########
File path: be/src/olap/push_handler.cpp
##########
@@ -116,11 +118,12 @@ OLAPStatus 
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP
 
             DeletePredicatePB del_pred;
             DeleteConditionHandler del_cond_handler;
-            tablet_var.tablet->obtain_header_rdlock();
-            res = 
del_cond_handler.generate_delete_predicate(tablet_var.tablet->tablet_schema(),
-                                                             
request.delete_conditions, &del_pred);
-            del_preds.push(del_pred);
-            tablet_var.tablet->release_header_lock();
+            {

Review comment:
       why add { here?

##########
File path: be/src/olap/schema_change.cpp
##########
@@ -1490,133 +1493,131 @@ OLAPStatus 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
               << " base_tablet=" << base_tablet->full_name()
               << " new_tablet=" << new_tablet->full_name();
 
-    ReadLock base_migration_rlock(base_tablet->get_migration_lock_ptr(), 
TRY_LOCK);
-    if (!base_migration_rlock.own_lock()) {
+    ReadLock base_migration_rlock(base_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
         return OLAP_ERR_RWLOCK_ERROR;
     }
-    ReadLock new_migration_rlock(new_tablet->get_migration_lock_ptr(), 
TRY_LOCK);
-    if (!new_migration_rlock.own_lock()) {
+    ReadLock new_migration_rlock(new_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!new_migration_rlock.owns_lock()) {
         return OLAP_ERR_RWLOCK_ERROR;
     }
 
-    // begin to find deltas to convert from base tablet to new tablet so that
-    // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
-    base_tablet->obtain_push_lock();
-    new_tablet->obtain_push_lock();
-    base_tablet->obtain_header_wrlock();
-    new_tablet->obtain_header_wrlock();
-
-    // check if the tablet has alter task
-    // if it has alter task, it means it is under old alter process
-
     std::vector<Version> versions_to_be_changed;
     std::vector<RowsetReaderSharedPtr> rs_readers;
     // delete handlers for new tablet
     DeleteHandler delete_handler;
     std::vector<ColumnId> return_columns;
-    size_t num_cols = base_tablet->tablet_schema().num_columns();
-    return_columns.resize(num_cols);
-    for (int i = 0; i < num_cols; ++i) {
-        return_columns[i] = i;
-    }
 
-    // reader_context is stack variables, it's lifetime should keep the same
-    // with rs_readers
-    RowsetReaderContext reader_context;
-    reader_context.reader_type = READER_ALTER_TABLE;
-    reader_context.tablet_schema = &base_tablet->tablet_schema();
-    reader_context.need_ordered_result = true;
-    reader_context.delete_handler = &delete_handler;
-    reader_context.return_columns = &return_columns;
-    // for schema change, seek_columns is the same to return_columns
-    reader_context.seek_columns = &return_columns;
-    reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
-
-    auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + 
std::to_string(base_tablet->tablet_id()) + "-"
-        + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, 
MemTrackerLevel::TASK);
-
-    do {
-        // get history data to be converted and it will check if there is hold 
in base tablet
-        res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to get version to be changed. res=" << res;
-            break;
-        }
-
-        // should check the max_version >= request.alter_version, if not the 
convert is useless
-        RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version();
-        if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
-            LOG(WARNING) << "base tablet's max version="
-                         << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
-                         << " is less than request version=" << 
request.alter_version;
-            res = OLAP_ERR_VERSION_NOT_EXIST;
-            break;
-        }
-        // before calculating version_to_be_changed,
-        // remove all data from new tablet, prevent to rewrite data(those 
double pushed when wait)
-        LOG(INFO) << "begin to remove all data from new tablet to prevent 
rewrite."
-                  << " new_tablet=" << new_tablet->full_name();
-        std::vector<RowsetSharedPtr> rowsets_to_delete;
-        std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
-        new_tablet->acquire_version_and_rowsets(&version_rowsets);
-        for (auto& pair : version_rowsets) {
-            if (pair.first.second <= max_rowset->end_version()) {
-                rowsets_to_delete.push_back(pair.second);
+    // begin to find deltas to convert from base tablet to new tablet so that
+    // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
+    base_tablet->obtain_push_lock();
+    new_tablet->obtain_push_lock();
+    {
+        WriteLock base_tablet_rdlock(base_tablet->get_header_lock());
+        WriteLock new_tablet_rdlock(new_tablet->get_header_lock());
+        // check if the tablet has alter task
+        // if it has alter task, it means it is under old alter process
+        size_t num_cols = base_tablet->tablet_schema().num_columns();
+        return_columns.resize(num_cols);
+        for (int i = 0; i < num_cols; ++i) {
+            return_columns[i] = i;
+        }
+
+        // reader_context is stack variables, it's lifetime should keep the 
same
+        // with rs_readers
+        RowsetReaderContext reader_context;
+        reader_context.reader_type = READER_ALTER_TABLE;
+        reader_context.tablet_schema = &base_tablet->tablet_schema();
+        reader_context.need_ordered_result = true;
+        reader_context.delete_handler = &delete_handler;
+        reader_context.return_columns = &return_columns;
+        // for schema change, seek_columns is the same to return_columns
+        reader_context.seek_columns = &return_columns;
+        reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
+
+        auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + 
std::to_string(base_tablet->tablet_id()) + "-"
+                                                         + 
std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, 
MemTrackerLevel::TASK);
+
+        do {
+            // get history data to be converted and it will check if there is 
hold in base tablet
+            res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "fail to get version to be changed. res=" << 
res;
+                break;
             }
-        }
-        std::vector<RowsetSharedPtr> empty_vec;
-        new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
-        // inherit cumulative_layer_point from base_tablet
-        // check if new_tablet.ce_point > base_tablet.ce_point?
-        new_tablet->set_cumulative_layer_point(-1);
-        // save tablet meta
-        new_tablet->save_meta();
-        for (auto& rowset : rowsets_to_delete) {
-            // do not call rowset.remove directly, using gc thread to delete it
-            StorageEngine::instance()->add_unused_rowset(rowset);
-        }
 
-        // init one delete handler
-        int32_t end_version = -1;
-        for (auto& version : versions_to_be_changed) {
-            if (version.second > end_version) {
-                end_version = version.second;
+            // should check the max_version >= request.alter_version, if not 
the convert is useless
+            RowsetSharedPtr max_rowset = 
base_tablet->rowset_with_max_version();
+            if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
+                LOG(WARNING) << "base tablet's max version="
+                             << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
+                             << " is less than request version=" << 
request.alter_version;
+                res = OLAP_ERR_VERSION_NOT_EXIST;
+                break;
+            }
+            // before calculating version_to_be_changed,
+            // remove all data from new tablet, prevent to rewrite data(those 
double pushed when wait)
+            LOG(INFO) << "begin to remove all data from new tablet to prevent 
rewrite."
+                      << " new_tablet=" << new_tablet->full_name();
+            std::vector<RowsetSharedPtr> rowsets_to_delete;
+            std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+            new_tablet->acquire_version_and_rowsets(&version_rowsets);
+            for (auto& pair : version_rowsets) {
+                if (pair.first.second <= max_rowset->end_version()) {
+                    rowsets_to_delete.push_back(pair.second);
+                }
+            }
+            std::vector<RowsetSharedPtr> empty_vec;
+            new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
+            // inherit cumulative_layer_point from base_tablet
+            // check if new_tablet.ce_point > base_tablet.ce_point?
+            new_tablet->set_cumulative_layer_point(-1);
+            // save tablet meta
+            new_tablet->save_meta();
+            for (auto& rowset : rowsets_to_delete) {
+                // do not call rowset.remove directly, using gc thread to 
delete it
+                StorageEngine::instance()->add_unused_rowset(rowset);
             }
-        }
 
-        res = delete_handler.init(base_tablet->tablet_schema(), 
base_tablet->delete_predicates(),
-                                  end_version);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "init delete handler failed. base_tablet=" << 
base_tablet->full_name()
-                         << ", end_version=" << end_version;
+            // init one delete handler
+            int32_t end_version = -1;
+            for (auto& version : versions_to_be_changed) {
+                if (version.second > end_version) {
+                    end_version = version.second;
+                }
+            }
 
-            // release delete handlers which have been inited successfully.
-            delete_handler.finalize();
-            break;
-        }
+            res = delete_handler.init(base_tablet->tablet_schema(), 
base_tablet->delete_predicates(),
+                                      end_version);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "init delete handler failed. base_tablet=" << 
base_tablet->full_name()
+                             << ", end_version=" << end_version;
 
-        // acquire data sources correspond to history versions
-        base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, 
mem_tracker);
-        if (rs_readers.size() < 1) {
-            LOG(WARNING) << "fail to acquire all data sources. "
-                         << "version_num=" << versions_to_be_changed.size()
-                         << ", data_source_num=" << rs_readers.size();
-            res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
-            break;
-        }
+                // release delete handlers which have been inited successfully.
+                delete_handler.finalize();
+                break;
+            }
 
-        for (auto& rs_reader : rs_readers) {
-            res = rs_reader->init(&reader_context);
-            if (res != OLAP_SUCCESS) {
-                LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
+            // acquire data sources correspond to history versions
+            base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_readers, mem_tracker);
+            if (rs_readers.size() < 1) {
+                LOG(WARNING) << "fail to acquire all data sources. "
+                             << "version_num=" << versions_to_be_changed.size()
+                             << ", data_source_num=" << rs_readers.size();
+                res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
                 break;
             }
-        }
 
-    } while (0);
+            for (auto& rs_reader : rs_readers) {
+                res = rs_reader->init(&reader_context);
+                if (res != OLAP_SUCCESS) {
+                    LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
+                    break;
+                }
+            }
 
-    new_tablet->release_header_lock();
-    base_tablet->release_header_lock();
+        } while (0);
+    }
     new_tablet->release_push_lock();

Review comment:
       I think we could reference clickhouse's  scope_safe_guard.h, then we 
could use RAII mode to manage the lock's lock or unlock. And will not forget to 
unlock.
   There are many code like this in doris:
   
   acquire lock xxxx
   do something xxxx
   release lock xxxx
   

##########
File path: be/src/olap/tablet_manager.cpp
##########
@@ -128,25 +127,26 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId 
tablet_id, SchemaHash s
         }
     }
 
-    existed_tablet->obtain_header_rdlock();
     const RowsetSharedPtr old_rowset = 
existed_tablet->rowset_with_max_version();
     const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
-
-    // If new tablet is empty, it is a newly created schema change tablet.
-    // the old tablet is dropped before add tablet. it should not exist old 
tablet
-    if (new_rowset == nullptr) {
-        existed_tablet->release_header_lock();
-        // it seems useless to call unlock and return here.
-        // it could prevent error when log level is changed in the future.
-        LOG(FATAL) << "new tablet is empty and old tablet exists. it should 
not happen."
-                   << " tablet_id=" << tablet_id << " schema_hash=" << 
schema_hash;
-        return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
+    int64_t old_time, new_time;
+    int32_t old_version, new_version;
+    {
+        ReadLock rdlock(existed_tablet->get_header_lock());

Review comment:
       If we use scope guard,  lots of code will not be changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to