caiconghui commented on a change in pull request #8387:
URL: https://github.com/apache/incubator-doris/pull/8387#discussion_r821481180



##########
File path: be/src/olap/schema_change.cpp
##########
@@ -1490,133 +1493,131 @@ OLAPStatus 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
               << " base_tablet=" << base_tablet->full_name()
               << " new_tablet=" << new_tablet->full_name();
 
-    ReadLock base_migration_rlock(base_tablet->get_migration_lock_ptr(), 
TRY_LOCK);
-    if (!base_migration_rlock.own_lock()) {
+    ReadLock base_migration_rlock(base_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
         return OLAP_ERR_RWLOCK_ERROR;
     }
-    ReadLock new_migration_rlock(new_tablet->get_migration_lock_ptr(), 
TRY_LOCK);
-    if (!new_migration_rlock.own_lock()) {
+    ReadLock new_migration_rlock(new_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!new_migration_rlock.owns_lock()) {
         return OLAP_ERR_RWLOCK_ERROR;
     }
 
-    // begin to find deltas to convert from base tablet to new tablet so that
-    // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
-    base_tablet->obtain_push_lock();
-    new_tablet->obtain_push_lock();
-    base_tablet->obtain_header_wrlock();
-    new_tablet->obtain_header_wrlock();
-
-    // check if the tablet has alter task
-    // if it has alter task, it means it is under old alter process
-
     std::vector<Version> versions_to_be_changed;
     std::vector<RowsetReaderSharedPtr> rs_readers;
     // delete handlers for new tablet
     DeleteHandler delete_handler;
     std::vector<ColumnId> return_columns;
-    size_t num_cols = base_tablet->tablet_schema().num_columns();
-    return_columns.resize(num_cols);
-    for (int i = 0; i < num_cols; ++i) {
-        return_columns[i] = i;
-    }
 
-    // reader_context is stack variables, it's lifetime should keep the same
-    // with rs_readers
-    RowsetReaderContext reader_context;
-    reader_context.reader_type = READER_ALTER_TABLE;
-    reader_context.tablet_schema = &base_tablet->tablet_schema();
-    reader_context.need_ordered_result = true;
-    reader_context.delete_handler = &delete_handler;
-    reader_context.return_columns = &return_columns;
-    // for schema change, seek_columns is the same to return_columns
-    reader_context.seek_columns = &return_columns;
-    reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
-
-    auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + 
std::to_string(base_tablet->tablet_id()) + "-"
-        + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, 
MemTrackerLevel::TASK);
-
-    do {
-        // get history data to be converted and it will check if there is hold 
in base tablet
-        res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to get version to be changed. res=" << res;
-            break;
-        }
-
-        // should check the max_version >= request.alter_version, if not the 
convert is useless
-        RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version();
-        if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
-            LOG(WARNING) << "base tablet's max version="
-                         << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
-                         << " is less than request version=" << 
request.alter_version;
-            res = OLAP_ERR_VERSION_NOT_EXIST;
-            break;
-        }
-        // before calculating version_to_be_changed,
-        // remove all data from new tablet, prevent to rewrite data(those 
double pushed when wait)
-        LOG(INFO) << "begin to remove all data from new tablet to prevent 
rewrite."
-                  << " new_tablet=" << new_tablet->full_name();
-        std::vector<RowsetSharedPtr> rowsets_to_delete;
-        std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
-        new_tablet->acquire_version_and_rowsets(&version_rowsets);
-        for (auto& pair : version_rowsets) {
-            if (pair.first.second <= max_rowset->end_version()) {
-                rowsets_to_delete.push_back(pair.second);
+    // begin to find deltas to convert from base tablet to new tablet so that
+    // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
+    base_tablet->obtain_push_lock();
+    new_tablet->obtain_push_lock();
+    {
+        WriteLock base_tablet_rdlock(base_tablet->get_header_lock());
+        WriteLock new_tablet_rdlock(new_tablet->get_header_lock());
+        // check if the tablet has alter task
+        // if it has alter task, it means it is under old alter process
+        size_t num_cols = base_tablet->tablet_schema().num_columns();
+        return_columns.resize(num_cols);
+        for (int i = 0; i < num_cols; ++i) {
+            return_columns[i] = i;
+        }
+
+        // reader_context is stack variables, it's lifetime should keep the 
same
+        // with rs_readers
+        RowsetReaderContext reader_context;
+        reader_context.reader_type = READER_ALTER_TABLE;
+        reader_context.tablet_schema = &base_tablet->tablet_schema();
+        reader_context.need_ordered_result = true;
+        reader_context.delete_handler = &delete_handler;
+        reader_context.return_columns = &return_columns;
+        // for schema change, seek_columns is the same to return_columns
+        reader_context.seek_columns = &return_columns;
+        reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
+
+        auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + 
std::to_string(base_tablet->tablet_id()) + "-"
+                                                         + 
std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, 
MemTrackerLevel::TASK);
+
+        do {
+            // get history data to be converted and it will check if there is 
hold in base tablet
+            res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "fail to get version to be changed. res=" << 
res;
+                break;
             }
-        }
-        std::vector<RowsetSharedPtr> empty_vec;
-        new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
-        // inherit cumulative_layer_point from base_tablet
-        // check if new_tablet.ce_point > base_tablet.ce_point?
-        new_tablet->set_cumulative_layer_point(-1);
-        // save tablet meta
-        new_tablet->save_meta();
-        for (auto& rowset : rowsets_to_delete) {
-            // do not call rowset.remove directly, using gc thread to delete it
-            StorageEngine::instance()->add_unused_rowset(rowset);
-        }
 
-        // init one delete handler
-        int32_t end_version = -1;
-        for (auto& version : versions_to_be_changed) {
-            if (version.second > end_version) {
-                end_version = version.second;
+            // should check the max_version >= request.alter_version, if not 
the convert is useless
+            RowsetSharedPtr max_rowset = 
base_tablet->rowset_with_max_version();
+            if (max_rowset == nullptr || max_rowset->end_version() < 
request.alter_version) {
+                LOG(WARNING) << "base tablet's max version="
+                             << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
+                             << " is less than request version=" << 
request.alter_version;
+                res = OLAP_ERR_VERSION_NOT_EXIST;
+                break;
+            }
+            // before calculating version_to_be_changed,
+            // remove all data from new tablet, prevent to rewrite data(those 
double pushed when wait)
+            LOG(INFO) << "begin to remove all data from new tablet to prevent 
rewrite."
+                      << " new_tablet=" << new_tablet->full_name();
+            std::vector<RowsetSharedPtr> rowsets_to_delete;
+            std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+            new_tablet->acquire_version_and_rowsets(&version_rowsets);
+            for (auto& pair : version_rowsets) {
+                if (pair.first.second <= max_rowset->end_version()) {
+                    rowsets_to_delete.push_back(pair.second);
+                }
+            }
+            std::vector<RowsetSharedPtr> empty_vec;
+            new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
+            // inherit cumulative_layer_point from base_tablet
+            // check if new_tablet.ce_point > base_tablet.ce_point?
+            new_tablet->set_cumulative_layer_point(-1);
+            // save tablet meta
+            new_tablet->save_meta();
+            for (auto& rowset : rowsets_to_delete) {
+                // do not call rowset.remove directly, using gc thread to 
delete it
+                StorageEngine::instance()->add_unused_rowset(rowset);
             }
-        }
 
-        res = delete_handler.init(base_tablet->tablet_schema(), 
base_tablet->delete_predicates(),
-                                  end_version);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "init delete handler failed. base_tablet=" << 
base_tablet->full_name()
-                         << ", end_version=" << end_version;
+            // init one delete handler
+            int32_t end_version = -1;
+            for (auto& version : versions_to_be_changed) {
+                if (version.second > end_version) {
+                    end_version = version.second;
+                }
+            }
 
-            // release delete handlers which have been inited successfully.
-            delete_handler.finalize();
-            break;
-        }
+            res = delete_handler.init(base_tablet->tablet_schema(), 
base_tablet->delete_predicates(),
+                                      end_version);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "init delete handler failed. base_tablet=" << 
base_tablet->full_name()
+                             << ", end_version=" << end_version;
 
-        // acquire data sources correspond to history versions
-        base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, 
mem_tracker);
-        if (rs_readers.size() < 1) {
-            LOG(WARNING) << "fail to acquire all data sources. "
-                         << "version_num=" << versions_to_be_changed.size()
-                         << ", data_source_num=" << rs_readers.size();
-            res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
-            break;
-        }
+                // release delete handlers which have been inited successfully.
+                delete_handler.finalize();
+                break;
+            }
 
-        for (auto& rs_reader : rs_readers) {
-            res = rs_reader->init(&reader_context);
-            if (res != OLAP_SUCCESS) {
-                LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
+            // acquire data sources correspond to history versions
+            base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_readers, mem_tracker);
+            if (rs_readers.size() < 1) {
+                LOG(WARNING) << "fail to acquire all data sources. "
+                             << "version_num=" << versions_to_be_changed.size()
+                             << ", data_source_num=" << rs_readers.size();
+                res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
                 break;
             }
-        }
 
-    } while (0);
+            for (auto& rs_reader : rs_readers) {
+                res = rs_reader->init(&reader_context);
+                if (res != OLAP_SUCCESS) {
+                    LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
+                    break;
+                }
+            }
 
-    new_tablet->release_header_lock();
-    base_tablet->release_header_lock();
+        } while (0);
+    }
     new_tablet->release_push_lock();

Review comment:
       this PR is to modify RWMutex first, and I will modify it in next PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to