morningman commented on a change in pull request #8553:
URL: https://github.com/apache/incubator-doris/pull/8553#discussion_r834047621



##########
File path: be/src/olap/task/engine_storage_migration_task.cpp
##########
@@ -32,91 +32,147 @@ OLAPStatus EngineStorageMigrationTask::execute() {
     return _migrate();
 }
 
-OLAPStatus EngineStorageMigrationTask::_migrate() {
-    int64_t tablet_id = _tablet->tablet_id();
-    int32_t schema_hash = _tablet->schema_hash();
-    LOG(INFO) << "begin to process tablet migrate. "
-              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
-
-    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, 
int32_t* end_version,
+                                        std::vector<RowsetSharedPtr> 
*consistent_rowsets) {
+    ReadLock rdlock(_tablet->get_header_lock());
+    const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+    if (last_version == nullptr) {
+        LOG(WARNING) << "failed to get rowset with max version, tablet="
+                        << _tablet->full_name();
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
 
-    // try hold migration lock first
-    OLAPStatus res = OLAP_SUCCESS;
-    UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
-    if (!migration_wlock.owns_lock()) {
-        return OLAP_ERR_RWLOCK_ERROR;
+    *end_version = last_version->end_version();
+    if (*end_version < start_version) {
+        // rowsets are empty
+        VLOG_DEBUG << "consistent rowsets empty. tablet=" << 
_tablet->full_name()
+                        << ", start_version=" << start_version << ", 
end_version=" << *end_version;
+        return OLAP_SUCCESS;
     }
+    _tablet->capture_consistent_rowsets(Version(start_version, *end_version), 
consistent_rowsets);
+    if (consistent_rowsets->empty()) {
+        LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << 
_tablet->full_name()
+                        << ", version=" << *end_version;
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
+    return OLAP_SUCCESS;
+}
 
-    // check if this tablet has related running txns. if yes, can not do 
migration.
+OLAPStatus EngineStorageMigrationTask::_check_running_txns() {
+    // need hold migration lock outside
     int64_t partition_id;
     std::set<int64_t> transaction_ids;
+    // check if this tablet has related running txns. if yes, can not do 
migration.
     StorageEngine::instance()->txn_manager()->get_tablet_related_txns(
-            tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id, 
&transaction_ids);
+            _tablet->tablet_id(), _tablet->schema_hash(), 
_tablet->tablet_uid(), &partition_id, &transaction_ids);
     if (transaction_ids.size() > 0) {
         LOG(WARNING) << "could not migration because has unfinished txns, "
-                     << " tablet=" << _tablet->full_name();
+                    << " tablet=" << _tablet->full_name();
         return OLAP_ERR_HEADER_HAS_PENDING_DATA;
     }
+    return OLAP_SUCCESS;
+}
 
-    _tablet->obtain_push_lock();
+OLAPStatus EngineStorageMigrationTask::_migrate() {
+    int64_t tablet_id = _tablet->tablet_id();
+    int32_t schema_hash = _tablet->schema_hash();
+    LOG(INFO) << "begin to process tablet migrate. "
+              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
 
-    // TODO(ygl): the tablet should not under schema change or rollup or load
-    do {
-        std::vector<RowsetSharedPtr> consistent_rowsets;
-        {
-            ReadLock rdlock(_tablet->get_header_lock());
-            // get all versions to be migrate
-            const RowsetSharedPtr last_version = 
_tablet->rowset_with_max_version();
-            if (last_version == nullptr) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "failed to get rowset with max version, 
tablet="
-                             << _tablet->full_name();
-                break;
-            }
-            int32_t end_version = last_version->end_version();
-            res = _tablet->capture_consistent_rowsets(Version(0, end_version), 
&consistent_rowsets);
-            if (consistent_rowsets.empty()) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "fail to capture consistent rowsets. tablet=" 
<< _tablet->full_name()
-                             << ", version=" << end_version;
-                break;
-            }
+    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+    int32_t start_version = 0;
+    int32_t end_version = 0;
+    std::vector<RowsetSharedPtr> consistent_rowsets;
+
+    // try hold migration lock first
+    OLAPStatus res = OLAP_SUCCESS;
+    {
+        UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
+        if (!migration_wlock.owns_lock()) {
+            return OLAP_ERR_RWLOCK_ERROR;
         }
 
-        uint64_t shard = 0;
-        res = _dest_store->get_shard(&shard);
+        // check if this tablet has related running txns. if yes, can not do 
migration.
+        res = _check_running_txns();
         if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to get shard from store: " << 
_dest_store->path();
-            break;
-        }
-        FilePathDescStream root_path_desc_s;
-        root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << 
shard;
-        FilePathDesc full_path_desc = 
SnapshotManager::instance()->get_schema_hash_full_path(
-                _tablet, root_path_desc_s.path_desc());
-        string full_path = full_path_desc.filepath;
-        // if dir already exist then return err, it should not happen.
-        // should not remove the dir directly, for safety reason.
-        if (FileUtils::check_exist(full_path)) {
-            LOG(INFO) << "schema hash path already exist, skip this path. "
-                      << "full_path=" << full_path;
-            res = OLAP_ERR_FILE_ALREADY_EXIST;
-            break;
+            return res;
         }
 
-        Status st = FileUtils::create_dir(full_path);
-        if (!st.ok()) {
-            res = OLAP_ERR_CANNOT_CREATE_DIR;
-            LOG(WARNING) << "fail to create path. path=" << full_path
-                         << ", error:" << st.to_string();
-            break;
+        _tablet->obtain_push_lock();

Review comment:
       use RAII.
   Rebase the master and see PR #8452

##########
File path: docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE 
DISK.md
##########
@@ -0,0 +1,52 @@
+---

Review comment:
       Need to add this doc to `docs/.vuepress/sidebar/en.js/zh-CN.js`

##########
File path: be/src/olap/task/engine_storage_migration_task.cpp
##########
@@ -32,91 +32,147 @@ OLAPStatus EngineStorageMigrationTask::execute() {
     return _migrate();
 }
 
-OLAPStatus EngineStorageMigrationTask::_migrate() {
-    int64_t tablet_id = _tablet->tablet_id();
-    int32_t schema_hash = _tablet->schema_hash();
-    LOG(INFO) << "begin to process tablet migrate. "
-              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
-
-    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, 
int32_t* end_version,
+                                        std::vector<RowsetSharedPtr> 
*consistent_rowsets) {
+    ReadLock rdlock(_tablet->get_header_lock());
+    const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+    if (last_version == nullptr) {
+        LOG(WARNING) << "failed to get rowset with max version, tablet="
+                        << _tablet->full_name();
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
 
-    // try hold migration lock first
-    OLAPStatus res = OLAP_SUCCESS;
-    UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
-    if (!migration_wlock.owns_lock()) {
-        return OLAP_ERR_RWLOCK_ERROR;
+    *end_version = last_version->end_version();
+    if (*end_version < start_version) {
+        // rowsets are empty
+        VLOG_DEBUG << "consistent rowsets empty. tablet=" << 
_tablet->full_name()
+                        << ", start_version=" << start_version << ", 
end_version=" << *end_version;
+        return OLAP_SUCCESS;
     }
+    _tablet->capture_consistent_rowsets(Version(start_version, *end_version), 
consistent_rowsets);
+    if (consistent_rowsets->empty()) {
+        LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << 
_tablet->full_name()
+                        << ", version=" << *end_version;
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
+    return OLAP_SUCCESS;
+}
 
-    // check if this tablet has related running txns. if yes, can not do 
migration.
+OLAPStatus EngineStorageMigrationTask::_check_running_txns() {
+    // need hold migration lock outside
     int64_t partition_id;
     std::set<int64_t> transaction_ids;
+    // check if this tablet has related running txns. if yes, can not do 
migration.
     StorageEngine::instance()->txn_manager()->get_tablet_related_txns(
-            tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id, 
&transaction_ids);
+            _tablet->tablet_id(), _tablet->schema_hash(), 
_tablet->tablet_uid(), &partition_id, &transaction_ids);
     if (transaction_ids.size() > 0) {
         LOG(WARNING) << "could not migration because has unfinished txns, "
-                     << " tablet=" << _tablet->full_name();
+                    << " tablet=" << _tablet->full_name();
         return OLAP_ERR_HEADER_HAS_PENDING_DATA;
     }
+    return OLAP_SUCCESS;
+}
 
-    _tablet->obtain_push_lock();
+OLAPStatus EngineStorageMigrationTask::_migrate() {
+    int64_t tablet_id = _tablet->tablet_id();
+    int32_t schema_hash = _tablet->schema_hash();
+    LOG(INFO) << "begin to process tablet migrate. "
+              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
 
-    // TODO(ygl): the tablet should not under schema change or rollup or load
-    do {
-        std::vector<RowsetSharedPtr> consistent_rowsets;
-        {
-            ReadLock rdlock(_tablet->get_header_lock());
-            // get all versions to be migrate
-            const RowsetSharedPtr last_version = 
_tablet->rowset_with_max_version();
-            if (last_version == nullptr) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "failed to get rowset with max version, 
tablet="
-                             << _tablet->full_name();
-                break;
-            }
-            int32_t end_version = last_version->end_version();
-            res = _tablet->capture_consistent_rowsets(Version(0, end_version), 
&consistent_rowsets);
-            if (consistent_rowsets.empty()) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "fail to capture consistent rowsets. tablet=" 
<< _tablet->full_name()
-                             << ", version=" << end_version;
-                break;
-            }
+    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+    int32_t start_version = 0;
+    int32_t end_version = 0;
+    std::vector<RowsetSharedPtr> consistent_rowsets;
+
+    // try hold migration lock first
+    OLAPStatus res = OLAP_SUCCESS;
+    {
+        UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
+        if (!migration_wlock.owns_lock()) {
+            return OLAP_ERR_RWLOCK_ERROR;
         }
 
-        uint64_t shard = 0;
-        res = _dest_store->get_shard(&shard);
+        // check if this tablet has related running txns. if yes, can not do 
migration.
+        res = _check_running_txns();
         if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to get shard from store: " << 
_dest_store->path();
-            break;
-        }
-        FilePathDescStream root_path_desc_s;
-        root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << 
shard;
-        FilePathDesc full_path_desc = 
SnapshotManager::instance()->get_schema_hash_full_path(
-                _tablet, root_path_desc_s.path_desc());
-        string full_path = full_path_desc.filepath;
-        // if dir already exist then return err, it should not happen.
-        // should not remove the dir directly, for safety reason.
-        if (FileUtils::check_exist(full_path)) {
-            LOG(INFO) << "schema hash path already exist, skip this path. "
-                      << "full_path=" << full_path;
-            res = OLAP_ERR_FILE_ALREADY_EXIST;
-            break;
+            return res;
         }
 
-        Status st = FileUtils::create_dir(full_path);
-        if (!st.ok()) {
-            res = OLAP_ERR_CANNOT_CREATE_DIR;
-            LOG(WARNING) << "fail to create path. path=" << full_path
-                         << ", error:" << st.to_string();
-            break;
+        _tablet->obtain_push_lock();
+        // get versions to be migrate
+        res = _get_versions(start_version, &end_version, &consistent_rowsets);
+        _tablet->release_push_lock();
+        if (res != OLAP_SUCCESS) {
+            return res;
         }
+    }
+
+    // TODO(ygl): the tablet should not under schema change or rollup or load
+    uint64_t shard = 0;
+    res = _dest_store->get_shard(&shard);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to get shard from store: " << 
_dest_store->path();
+        return res;
+    }
+    FilePathDescStream root_path_desc_s;
+    root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << 
shard;
+    FilePathDesc full_path_desc = 
SnapshotManager::instance()->get_schema_hash_full_path(
+            _tablet, root_path_desc_s.path_desc());
+    string full_path = full_path_desc.filepath;
+    // if dir already exist then return err, it should not happen.
+    // should not remove the dir directly, for safety reason.
+    if (FileUtils::check_exist(full_path)) {
+        LOG(INFO) << "schema hash path already exist, skip this path. "
+                    << "full_path=" << full_path;
+        return OLAP_ERR_FILE_ALREADY_EXIST;
+    }
+
+    Status st = FileUtils::create_dir(full_path);
+    if (!st.ok()) {
+        res = OLAP_ERR_CANNOT_CREATE_DIR;
+        LOG(WARNING) << "fail to create path. path=" << full_path
+                        << ", error:" << st.to_string();
+        return res;
+    }
 
+    do {
         // migrate all index and data files but header file
         res = _copy_index_and_data_files(full_path, consistent_rowsets);
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "fail to copy index and data files when migrate. 
res=" << res;
+            // we should remove the dir directly for avoid disk full of junk 
data, and it's safe to remove
+            FileUtils::remove_all(full_path);
+            return res;
+        }
+        UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
+        if (!migration_wlock.owns_lock()) {
+            LOG(WARNING) << "get migration lock fail.";
+            FileUtils::remove_all(full_path);
+            return OLAP_ERR_RWLOCK_ERROR;
+        }
+        res = _check_running_txns();
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "check running txns fail. res=" << res;
+            FileUtils::remove_all(full_path);
+            return res;
+        }
+        _tablet->obtain_push_lock();

Review comment:
       RAII




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to