This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f96bc62  [feature](balance) Support balance between disks on a single 
BE (#8553)
f96bc62 is described below

commit f96bc6257324ca329bf55907d36782c4a15aa7b3
Author: yinzhijian <[email protected]>
AuthorDate: Mon Mar 28 10:03:21 2022 +0800

    [feature](balance) Support balance between disks on a single BE (#8553)
    
    Current situation of Doris is that the cluster is balanced, but the disks 
of a backend may be unbalanced.
    for example, backend A have two disks: disk1 and disk2, disk1's usage is 
98%, but disk2's usage is only 40%.
    disk1 is unable to take more data, therefore only one disk of backend A can 
take new data,
    the available write throughput of backend A is only half of its ability, 
and we can not resolve this through load or
    partition rebalance now.
    
    So we introduce disk rebalancer, disk rebalancer is different from other 
rebalancer(load or partition)
    which take care of cluster-wide data balancing. it takes care about 
backend-wide data balancing.
    
    [For more details see 
#8550](https://github.com/apache/incubator-doris/issues/8550)
---
 be/src/agent/task_worker_pool.cpp                  |  13 +-
 be/src/agent/task_worker_pool.h                    |   2 +-
 be/src/common/config.h                             |   5 +
 be/src/olap/task/engine_storage_migration_task.cpp | 316 ++++++++++++++-----
 be/src/olap/task/engine_storage_migration_task.h   |  20 +-
 be/test/olap/CMakeLists.txt                        |   1 +
 .../olap/engine_storage_migration_task_test.cpp    | 302 +++++++++++++++++++
 docs/.vuepress/sidebar/en.js                       |   2 +
 docs/.vuepress/sidebar/zh-CN.js                    |   2 +
 .../Administration/ADMIN CANCEL REBALANCE DISK.md  |  51 ++++
 .../Administration/ADMIN REBALANCE DISK.md         |  52 ++++
 .../Administration/ADMIN CANCEL REBALANCE DISK.md  |  52 ++++
 .../Administration/ADMIN REBALANCE DISK.md         |  54 ++++
 fe/fe-core/src/main/cup/sql_parser.cup             |  20 +-
 .../analysis/AdminCancelRebalanceDiskStmt.java     |  73 +++++
 .../doris/analysis/AdminRebalanceDiskStmt.java     |  79 +++++
 .../apache/doris/clone/BackendLoadStatistic.java   |  58 +++-
 .../org/apache/doris/clone/DiskRebalancer.java     | 334 +++++++++++++++++++++
 .../java/org/apache/doris/clone/Rebalancer.java    |  33 +-
 .../org/apache/doris/clone/TabletSchedCtx.java     |  65 ++++
 .../org/apache/doris/clone/TabletScheduler.java    | 123 +++++++-
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../java/org/apache/doris/master/MasterImpl.java   |  17 +-
 .../org/apache/doris/master/ReportHandler.java     |   4 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   6 +
 .../doris/task/StorageMediaMigrationTask.java      |  15 +
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   2 +
 .../analysis/AdminCancelRebalanceDiskStmtTest.java |  82 +++++
 .../doris/analysis/AdminRebalanceDiskStmtTest.java |  83 +++++
 .../{RebalanceTest.java => DiskRebalanceTest.java} | 228 ++++++--------
 .../java/org/apache/doris/clone/RebalanceTest.java |  29 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |  32 +-
 .../java/org/apache/doris/task/AgentTaskTest.java  |  15 +
 33 files changed, 1924 insertions(+), 252 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 29219fc..a10d2c4 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -941,8 +941,8 @@ void 
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
         TStatusCode::type status_code = TStatusCode::OK;
         // check request and get info
         TabletSharedPtr tablet;
-        DataDir* dest_store;
-        if (_check_migrate_requset(storage_medium_migrate_req, tablet, 
&dest_store) !=
+        DataDir* dest_store = nullptr;
+        if (_check_migrate_request(storage_medium_migrate_req, tablet, 
&dest_store) !=
             OLAP_SUCCESS) {
             status_code = TStatusCode::RUNTIME_ERROR;
         } else {
@@ -953,7 +953,7 @@ void 
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
                              << ", signature: " << agent_task_req.signature;
                 status_code = TStatusCode::RUNTIME_ERROR;
             } else {
-                LOG(INFO) << "storage media migrate success. status:" << res 
<< ","
+                LOG(INFO) << "storage media migrate success. status:" << res
                           << ", signature:" << agent_task_req.signature;
             }
         }
@@ -974,7 +974,7 @@ void 
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
     }
 }
 
-OLAPStatus TaskWorkerPool::_check_migrate_requset(const 
TStorageMediumMigrateReq& req,
+OLAPStatus TaskWorkerPool::_check_migrate_request(const 
TStorageMediumMigrateReq& req,
                                                   TabletSharedPtr& tablet, 
DataDir** dest_store) {
     int64_t tablet_id = req.tablet_id;
     int32_t schema_hash = req.schema_hash;
@@ -1020,6 +1020,11 @@ OLAPStatus TaskWorkerPool::_check_migrate_requset(const 
TStorageMediumMigrateReq
 
         *dest_store = stores[0];
     }
+    if (tablet->data_dir()->path() == (*dest_store)->path()) {
+            LOG(INFO) << "tablet is already on specified path. "
+                      << "path=" << tablet->data_dir()->path();
+            return OLAP_REQUEST_FAILED;
+    }
 
     // check disk capacity
     int64_t tablet_size = tablet->tablet_footprint();
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3cffbc7..4181d0c 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -208,7 +208,7 @@ private:
     Status _move_dir(const TTabletId tablet_id, const TSchemaHash schema_hash,
                           const std::string& src, int64_t job_id, bool 
overwrite);
 
-    OLAPStatus _check_migrate_requset(const TStorageMediumMigrateReq& req, 
TabletSharedPtr& tablet,
+    OLAPStatus _check_migrate_request(const TStorageMediumMigrateReq& req, 
TabletSharedPtr& tablet,
                                       DataDir** dest_store);
 
     // random sleep 1~second seconds
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 438a4a3..fe92b6b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -332,6 +332,11 @@ CONF_Int32(min_tablet_migration_threads, "1");
 CONF_Int32(max_tablet_migration_threads, "1");
 
 CONF_mInt32(finished_migration_tasks_size, "10000");
+// If size less than this, the remaining rowsets will be force to complete
+CONF_mInt32(migration_remaining_size_threshold_mb, "10");
+// If the task runs longer than this time, the task will be terminated, in 
seconds.
+// tablet max size / migration min speed * factor = 10GB / 1MBps * 2 = 20480 
seconds
+CONF_mInt32(migration_task_timeout_secs, "20480");
 
 // Port to start debug webserver on
 CONF_Int32(webserver_port, "8040");
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 486730a..a3724a2 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/task/engine_storage_migration_task.h"
 
+#include <ctime>
+
 #include "olap/snapshot_manager.h"
 #include "olap/tablet_meta_manager.h"
 
@@ -24,146 +26,298 @@ namespace doris {
 
 using std::stringstream;
 
+const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;
+
 EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& 
tablet,
                                                        DataDir* dest_store)
-        : _tablet(tablet), _dest_store(dest_store) {}
+        : _tablet(tablet), _dest_store(dest_store) {
+        _task_start_time = time(nullptr);
+    }
 
 OLAPStatus EngineStorageMigrationTask::execute() {
     return _migrate();
 }
 
-OLAPStatus EngineStorageMigrationTask::_migrate() {
-    int64_t tablet_id = _tablet->tablet_id();
-    int32_t schema_hash = _tablet->schema_hash();
-    LOG(INFO) << "begin to process tablet migrate. "
-              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
+OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, 
int32_t* end_version,
+                                        std::vector<RowsetSharedPtr> 
*consistent_rowsets) {
+    ReadLock rdlock(_tablet->get_header_lock());
+    const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+    if (last_version == nullptr) {
+        LOG(WARNING) << "failed to get rowset with max version, tablet="
+                        << _tablet->full_name();
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
 
-    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+    *end_version = last_version->end_version();
+    if (*end_version < start_version) {
+        // rowsets are empty
+        VLOG_DEBUG << "consistent rowsets empty. tablet=" << 
_tablet->full_name()
+                        << ", start_version=" << start_version << ", 
end_version=" << *end_version;
+        return OLAP_SUCCESS;
+    }
+    _tablet->capture_consistent_rowsets(Version(start_version, *end_version), 
consistent_rowsets);
+    if (consistent_rowsets->empty()) {
+        LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << 
_tablet->full_name()
+                        << ", version=" << *end_version;
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
+    return OLAP_SUCCESS;
+}
 
-    // try hold migration lock first
-    OLAPStatus res = OLAP_SUCCESS;
-    UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
-    if (!migration_wlock.owns_lock()) {
-        return OLAP_ERR_RWLOCK_ERROR;
+bool EngineStorageMigrationTask::_is_timeout() {
+    int64_t time_elapsed = time(nullptr) - _task_start_time;
+    if (time_elapsed > config::migration_task_timeout_secs) {
+        LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" << 
time_elapsed
+            << ", tablet=" << _tablet->full_name();
+        return true;
     }
+    return false;
+}
 
-    // check if this tablet has related running txns. if yes, can not do 
migration.
+OLAPStatus EngineStorageMigrationTask::_check_running_txns() {
+    // need hold migration lock outside
     int64_t partition_id;
     std::set<int64_t> transaction_ids;
+    // check if this tablet has related running txns. if yes, can not do 
migration.
     StorageEngine::instance()->txn_manager()->get_tablet_related_txns(
-            tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id, 
&transaction_ids);
+            _tablet->tablet_id(), _tablet->schema_hash(), 
_tablet->tablet_uid(), &partition_id, &transaction_ids);
     if (transaction_ids.size() > 0) {
-        LOG(WARNING) << "could not migration because has unfinished txns, "
-                     << " tablet=" << _tablet->full_name();
         return OLAP_ERR_HEADER_HAS_PENDING_DATA;
     }
+    return OLAP_SUCCESS;
+}
 
-    std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
-    // TODO(ygl): the tablet should not under schema change or rollup or load
+OLAPStatus 
EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueWriteLock* 
migration_wlock) {
+    // caller should not hold migration lock, and 'migration_wlock' should not 
be nullptr
+    // ownership of the migration_wlock is transferred to the caller if check 
succ
+    DCHECK_NE(migration_wlock, nullptr);
+    OLAPStatus res = OLAP_SUCCESS;
+    int try_times = 1;
     do {
-        std::vector<RowsetSharedPtr> consistent_rowsets;
-        {
-            ReadLock rdlock(_tablet->get_header_lock());
-            // get all versions to be migrate
-            const RowsetSharedPtr last_version = 
_tablet->rowset_with_max_version();
-            if (last_version == nullptr) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "failed to get rowset with max version, 
tablet="
-                             << _tablet->full_name();
-                break;
-            }
-            int32_t end_version = last_version->end_version();
-            res = _tablet->capture_consistent_rowsets(Version(0, end_version), 
&consistent_rowsets);
-            if (consistent_rowsets.empty()) {
-                res = OLAP_ERR_VERSION_NOT_EXIST;
-                LOG(WARNING) << "fail to capture consistent rowsets. tablet=" 
<< _tablet->full_name()
-                             << ", version=" << end_version;
-                break;
-            }
+        // to avoid invalid loops, the lock is guaranteed to be acquired here
+        UniqueWriteLock wlock(_tablet->get_migration_lock());
+        res = _check_running_txns();
+        if (res == OLAP_SUCCESS) {
+            // transfer the lock to the caller
+            *migration_wlock = std::move(wlock);
+            return res;
+        }
+        LOG(INFO) << "check running txns fail, try again until timeout."
+             << " tablet=" << _tablet->full_name()
+             << ", try times=" << try_times
+             << ", res=" << res;
+        // unlock and sleep for a while, try again
+        wlock.unlock();
+        sleep(std::min(config::sleep_one_second * try_times, 
CHECK_TXNS_MAX_WAIT_TIME_SECS));
+        ++try_times;
+    } while (!_is_timeout());
+    return res;
+}
+
+OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
+                                        uint64_t shard,
+                                        const std::string& full_path,
+                                        const std::vector<RowsetSharedPtr>& 
consistent_rowsets) {
+    // need hold migration lock and push lock outside
+    OLAPStatus res = OLAP_SUCCESS;
+    int64_t tablet_id = _tablet->tablet_id();
+    int32_t schema_hash = _tablet->schema_hash();
+    TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta());
+    {
+        ReadLock rdlock(_tablet->get_header_lock());
+        _generate_new_header(shard, consistent_rowsets, new_tablet_meta);
+    }
+    std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + 
".hdr";
+    res = new_tablet_meta->save(new_meta_file);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "failed to save meta to path: " << new_meta_file;
+        return res;
+    }
+
+    // reset tablet id and rowset id
+    res = TabletMeta::reset_tablet_uid(new_meta_file);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file;
+        return res;
+    }
+    // it will change rowset id and its create time
+    // rowset create time is useful when load tablet from meta to check which 
tablet is the tablet to load
+    res = SnapshotManager::instance()->convert_rowset_ids(full_path, 
tablet_id, schema_hash);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "failed to convert rowset id when do storage migration"
+                        << " path = " << full_path;
+        return res;
+    }
+    return res;
+}
+
+OLAPStatus EngineStorageMigrationTask::_reload_tablet(
+                                        const std::string& full_path) {
+    // need hold migration lock and push lock outside
+    OLAPStatus res = OLAP_SUCCESS;
+    int64_t tablet_id = _tablet->tablet_id();
+    int32_t schema_hash = _tablet->schema_hash();
+    res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
+            _dest_store, tablet_id, schema_hash, full_path, false);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << 
tablet_id
+                        << " schema_hash=" << schema_hash << " path = " << 
full_path;
+        return res;
+    }
+
+    // if old tablet finished schema change, then the schema change status of 
the new tablet is DONE
+    // else the schema change status of the new tablet is FAILED
+    TabletSharedPtr new_tablet =
+            StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, 
schema_hash);
+    if (new_tablet == nullptr) {
+        LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id
+                        << " schema_hash=" << schema_hash;
+        return OLAP_ERR_TABLE_NOT_FOUND;
+    }
+    return res;
+}
+
+// if the size less than threshold, return true
+bool EngineStorageMigrationTask::_is_rowsets_size_less_than_threshold(
+        const std::vector<RowsetSharedPtr>& consistent_rowsets) {
+    size_t total_size = 0;
+    for (const auto& rs : consistent_rowsets) {
+        total_size += rs->index_disk_size() + rs->data_disk_size();
+    }
+    if (total_size < config::migration_remaining_size_threshold_mb) {
+        return true;
+    }
+    return false;
+}
+
+OLAPStatus EngineStorageMigrationTask::_migrate() {
+    int64_t tablet_id = _tablet->tablet_id();
+    LOG(INFO) << "begin to process tablet migrate. "
+              << "tablet_id=" << tablet_id << ", dest_store=" << 
_dest_store->path();
+
+    DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+    int32_t start_version = 0;
+    int32_t end_version = 0;
+    std::vector<RowsetSharedPtr> consistent_rowsets;
+
+    // try hold migration lock first
+    OLAPStatus res = OLAP_SUCCESS;
+    uint64_t shard = 0;
+    string full_path;
+    {
+        UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), 
std::try_to_lock);
+        if (!migration_wlock.owns_lock()) {
+            return OLAP_ERR_RWLOCK_ERROR;
+        }
+
+        // check if this tablet has related running txns. if yes, can not do 
migration.
+        res = _check_running_txns();
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "could not migration because has unfinished txns, "
+                        << " tablet=" << _tablet->full_name();
+            return res;
         }
 
-        uint64_t shard = 0;
+        std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
+        // get versions to be migrate
+        res = _get_versions(start_version, &end_version, &consistent_rowsets);
+        if (res != OLAP_SUCCESS) {
+            return res;
+        }
+
+        // TODO(ygl): the tablet should not under schema change or rollup or 
load
         res = _dest_store->get_shard(&shard);
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "fail to get shard from store: " << 
_dest_store->path();
-            break;
+            return res;
         }
         FilePathDescStream root_path_desc_s;
         root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << 
shard;
         FilePathDesc full_path_desc = 
SnapshotManager::instance()->get_schema_hash_full_path(
                 _tablet, root_path_desc_s.path_desc());
-        string full_path = full_path_desc.filepath;
+        full_path = full_path_desc.filepath;
         // if dir already exist then return err, it should not happen.
         // should not remove the dir directly, for safety reason.
         if (FileUtils::check_exist(full_path)) {
             LOG(INFO) << "schema hash path already exist, skip this path. "
-                      << "full_path=" << full_path;
-            res = OLAP_ERR_FILE_ALREADY_EXIST;
-            break;
+                        << "full_path=" << full_path;
+            return OLAP_ERR_FILE_ALREADY_EXIST;
         }
 
         Status st = FileUtils::create_dir(full_path);
         if (!st.ok()) {
             res = OLAP_ERR_CANNOT_CREATE_DIR;
             LOG(WARNING) << "fail to create path. path=" << full_path
-                         << ", error:" << st.to_string();
-            break;
+                            << ", error:" << st.to_string();
+            return res;
         }
+    }
 
+    std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+    do {
         // migrate all index and data files but header file
-        res = _copy_index_and_data_files(full_path, consistent_rowsets);
+        res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "fail to copy index and data files when migrate. 
res=" << res;
             break;
         }
-
-        // generate new tablet meta and write to hdr file
-        TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta());
-        {
-            ReadLock rdlock(_tablet->get_header_lock());
-            _generate_new_header(shard, consistent_rowsets, new_tablet_meta);
-        }
-        std::string new_meta_file = full_path + "/" + 
std::to_string(tablet_id) + ".hdr";
-        res = new_tablet_meta->save(new_meta_file);
+        UniqueWriteLock migration_wlock;
+        res = _check_running_txns_until_timeout(&migration_wlock);
         if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "failed to save meta to path: " << new_meta_file;
             break;
         }
-
-        // reset tablet id and rowset id
-        res = TabletMeta::reset_tablet_uid(new_meta_file);
+        std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
+        start_version = end_version;
+        // clear temp rowsets before get remaining versions
+        temp_consistent_rowsets.clear();
+        // get remaining versions
+        res = _get_versions(end_version + 1, &end_version, 
&temp_consistent_rowsets);
         if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file;
             break;
         }
-        // it will change rowset id and its create time
-        // rowset create time is useful when load tablet from meta to check 
which tablet is the tablet to load
-        res = SnapshotManager::instance()->convert_rowset_ids(full_path, 
tablet_id, schema_hash);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "failed to convert rowset id when do storage 
migration"
-                         << " path = " << full_path;
-            break;
+        if (start_version < end_version) {
+            // we have remaining versions to be migrated
+            consistent_rowsets.insert(consistent_rowsets.end(),
+                        temp_consistent_rowsets.begin(), 
temp_consistent_rowsets.end());
+            LOG(INFO) << "we have remaining versions to be migrated. 
start_version="
+                << start_version << " end_version=" << end_version;
+            // if the remaining size is less than 
config::migration_remaining_size_threshold_mb(default 10MB),
+            // we take the lock to complete it to avoid long-term competition 
with other tasks
+            if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) 
{
+                // force to copy the remaining data and index
+                res = _copy_index_and_data_files(full_path, 
temp_consistent_rowsets);
+                if (res != OLAP_SUCCESS) {
+                    LOG(WARNING) << "fail to copy the remaining index and data 
files when migrate. res=" << res;
+                    break;
+                }
+            } else {
+                if (_is_timeout()) {
+                    res = OLAP_ERR_HEADER_HAS_PENDING_DATA;
+                    break;
+                }
+                // there is too much remaining data here.
+                // in order not to affect other tasks, release the lock and 
then copy it
+                continue;
+            }
         }
 
-        res = 
StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
-                _dest_store, tablet_id, schema_hash, full_path, false);
+        // generate new tablet meta and write to hdr file
+        res = _gen_and_write_header_to_hdr_file(shard, full_path, 
consistent_rowsets);
         if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "failed to load tablet from new path. tablet_id=" 
<< tablet_id
-                         << " schema_hash=" << schema_hash << " path = " << 
full_path;
             break;
         }
-
-        // if old tablet finished schema change, then the schema change status 
of the new tablet is DONE
-        // else the schema change status of the new tablet is FAILED
-        TabletSharedPtr new_tablet =
-                
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
-        if (new_tablet == nullptr) {
-            LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id
-                         << " schema_hash=" << schema_hash;
-            res = OLAP_ERR_TABLE_NOT_FOUND;
+        res = _reload_tablet(full_path);
+        if (res != OLAP_SUCCESS) {
             break;
         }
-    } while (0);
+
+        break;
+    } while (true);
+
+    if (res != OLAP_SUCCESS) {
+        // we should remove the dir directly for avoid disk full of junk data, 
and it's safe to remove
+        FileUtils::remove_all(full_path);
+    }
     return res;
 }
 
diff --git a/be/src/olap/task/engine_storage_migration_task.h 
b/be/src/olap/task/engine_storage_migration_task.h
index 3009f4c..ffd42a4 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -36,6 +36,24 @@ public:
 
 private:
     OLAPStatus _migrate();
+    // check if task is timeout
+    bool _is_timeout();
+    OLAPStatus _get_versions(int32_t start_version,
+                                 int32_t* end_version,
+                                 std::vector<RowsetSharedPtr> 
*consistent_rowsets);
+    OLAPStatus _check_running_txns();
+    // caller should not hold migration lock, and 'migration_wlock' should not 
be nullptr
+    // ownership of the migration lock is transferred to the caller if check 
succ
+    OLAPStatus _check_running_txns_until_timeout(UniqueWriteLock* 
migration_wlock);
+
+    // if the size less than threshold, return true
+    bool _is_rowsets_size_less_than_threshold(const 
std::vector<RowsetSharedPtr>& consistent_rowsets);
+
+    OLAPStatus _gen_and_write_header_to_hdr_file(
+                            uint64_t shard,
+                            const std::string& full_path,
+                            const std::vector<RowsetSharedPtr>& 
consistent_rowsets);
+    OLAPStatus _reload_tablet(const std::string& full_path);
 
     void _generate_new_header(uint64_t new_shard,
                               const std::vector<RowsetSharedPtr>& 
consistent_rowsets,
@@ -52,7 +70,7 @@ private:
     TabletSharedPtr _tablet;
     // destination data dir
     DataDir* _dest_store;
-
+    int64_t _task_start_time;
 }; // EngineTask
 
 } // namespace doris
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 5e4d15f..81a551f 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -26,6 +26,7 @@ ADD_BE_TEST(row_block_test)
 ADD_BE_TEST(row_block_v2_test)
 ADD_BE_TEST(bit_field_test)
 ADD_BE_TEST(byte_buffer_test)
+ADD_BE_TEST(engine_storage_migration_task_test)
 ADD_BE_TEST(run_length_byte_test)
 ADD_BE_TEST(run_length_integer_test)
 ADD_BE_TEST(stream_index_test)
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
new file mode 100644
index 0000000..68c88f8
--- /dev/null
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_storage_migration_task.h"
+
+#include <gtest/gtest.h>
+#include <sys/file.h>
+
+#include <string>
+
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/Types_types.h"
+#include "olap/delta_writer.h"
+#include "olap/field.h"
+#include "olap/options.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta_manager.h"
+#include "olap/utils.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/tuple.h"
+#include "util/file_utils.h"
+#include "util/logging.h"
+
+namespace doris {
+
+static const uint32_t MAX_PATH_LEN = 1024;
+
+StorageEngine* k_engine = nullptr;
+std::string path1;
+std::string path2;
+
+void set_up() {
+    char buffer[MAX_PATH_LEN];
+    ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+    path1 = std::string(buffer) + "/data_test_1";
+    path2 = std::string(buffer) + "/data_test_2";
+    config::storage_root_path = path1 + ";" + path2;
+    FileUtils::remove_all(path1);
+    FileUtils::create_dir(path1);
+
+    FileUtils::remove_all(path2);
+    FileUtils::create_dir(path2);
+    std::vector<StorePath> paths;
+    paths.emplace_back(path1, -1);
+    paths.emplace_back(path2, -1);
+
+    doris::EngineOptions options;
+    options.store_paths = paths;
+    Status s = doris::StorageEngine::open(options, &k_engine);
+    ASSERT_TRUE(s.ok()) << s.to_string();
+
+    ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+    exec_env->set_storage_engine(k_engine);
+    k_engine->start_bg_threads();
+}
+
+void tear_down() {
+    if (k_engine != nullptr) {
+        k_engine->stop();
+        delete k_engine;
+        k_engine = nullptr;
+    }
+    ASSERT_EQ(system("rm -rf ./data_test_1"), 0);
+    ASSERT_EQ(system("rm -rf ./data_test_2"), 0);
+    FileUtils::remove_all(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX);
+}
+
+void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t 
schema_hash,
+                                             TCreateTabletReq* request) {
+    request->tablet_id = tablet_id;
+    request->__set_version(1);
+    request->tablet_schema.schema_hash = schema_hash;
+    request->tablet_schema.short_key_column_count = 2;
+    request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS;
+    request->tablet_schema.storage_type = TStorageType::COLUMN;
+    request->tablet_schema.__set_sequence_col_idx(2);
+
+    TColumn k1;
+    k1.column_name = "k1";
+    k1.__set_is_key(true);
+    k1.column_type.type = TPrimitiveType::TINYINT;
+    request->tablet_schema.columns.push_back(k1);
+
+    TColumn k2;
+    k2.column_name = "k2";
+    k2.__set_is_key(true);
+    k2.column_type.type = TPrimitiveType::SMALLINT;
+    request->tablet_schema.columns.push_back(k2);
+
+    TColumn sequence_col;
+    sequence_col.column_name = SEQUENCE_COL;
+    sequence_col.__set_is_key(false);
+    sequence_col.column_type.type = TPrimitiveType::INT;
+    sequence_col.__set_aggregation_type(TAggregationType::REPLACE);
+    request->tablet_schema.columns.push_back(sequence_col);
+
+    TColumn v1;
+    v1.column_name = "v1";
+    v1.__set_is_key(false);
+    v1.column_type.type = TPrimitiveType::DATETIME;
+    v1.__set_aggregation_type(TAggregationType::REPLACE);
+    request->tablet_schema.columns.push_back(v1);
+}
+
+TDescriptorTable create_descriptor_tablet_with_sequence_col() {
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder tuple_builder;
+
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
+    tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                   .type(TYPE_INT)
+                                   .column_name(SEQUENCE_COL)
+                                   .column_pos(2)
+                                   .build());
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build());
+    tuple_builder.build(&dtb);
+
+    return dtb.desc_tbl();
+}
+
+class TestEngineStorageMigrationTask : public ::testing::Test {
+public:
+    TestEngineStorageMigrationTask() {}
+    ~TestEngineStorageMigrationTask() {}
+
+    void SetUp() {
+        // Create local data dir for StorageEngine.
+        std::cout << "setup" << std::endl;
+    }
+
+    void TearDown() {
+        // Remove all dir.
+        std::cout << "tear down" << std::endl;
+        //doris::tear_down();
+        //ASSERT_EQ(OLAP_SUCCESS, remove_all_dir(config::storage_root_path));
+    }
+};
+
+TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
+    TCreateTabletReq request;
+    create_tablet_request_with_sequence_col(10005, 270068377, &request);
+    OLAPStatus res = k_engine->create_tablet(request);
+    ASSERT_EQ(OLAP_SUCCESS, res);
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+                              30003, load_id,   tuple_desc,      
&(tuple_desc->slots())};
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer);
+    ASSERT_NE(delta_writer, nullptr);
+
+    MemTracker tracker;
+    MemPool pool(&tracker);
+    // Tuple 1
+    {
+        Tuple* tuple = 
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
+        memset(tuple, 0, tuple_desc->byte_size());
+        *(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123;
+        *(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456;
+        *(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1;
+        ((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset())))
+                ->from_date_str("2020-07-16 19:39:43", 19);
+
+        res = delta_writer->write(tuple);
+        ASSERT_EQ(OLAP_SUCCESS, res);
+    }
+
+    res = delta_writer->close();
+    ASSERT_EQ(OLAP_SUCCESS, res);
+    res = delta_writer->close_wait(nullptr, false);
+    ASSERT_EQ(OLAP_SUCCESS, res);
+
+    // publish version success
+    TabletSharedPtr tablet =
+            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    Version version;
+    version.first = tablet->rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
+              << std::endl;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+    for (auto& tablet_rs : tablet_related_rs) {
+        std::cout << "start to publish txn" << std::endl;
+        RowsetSharedPtr rowset = tablet_rs.second;
+        res = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, 
write_req.schema_hash,
+                                                   tablet_rs.first.tablet_uid, 
version);
+        ASSERT_EQ(OLAP_SUCCESS, res);
+        std::cout << "start to add inc rowset:" << rowset->rowset_id()
+                  << ", num rows:" << rowset->num_rows() << ", version:" << 
rowset->version().first
+                  << "-" << rowset->version().second << std::endl;
+        res = tablet->add_inc_rowset(rowset);
+        ASSERT_EQ(OLAP_SUCCESS, res);
+    }
+    ASSERT_EQ(1, tablet->num_rows());
+    // we should sleep 1 second for the migrated tablet has different time 
with the current tablet
+    sleep(1);
+
+    // test case 1
+    // prepare
+    DataDir* dest_store = nullptr;
+    if (tablet->data_dir()->path() == path1) {
+        dest_store = StorageEngine::instance()->get_store(path2);
+    } else if (tablet->data_dir()->path() == path2) {
+        dest_store = StorageEngine::instance()->get_store(path1);
+    }
+    ASSERT_NE(dest_store, nullptr);
+    std::cout << "dest store:" << dest_store->path() << std::endl;
+    // migrating
+    EngineStorageMigrationTask engine_task(tablet, dest_store);
+    res = engine_task.execute();
+    ASSERT_EQ(OLAP_SUCCESS, res);
+    // reget the tablet from manager after migration
+    auto tablet_id = 10005;
+    auto schema_hash = 270068377;
+    TabletSharedPtr tablet2 = 
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    // check path
+    ASSERT_EQ(tablet2->data_dir()->path(), dest_store->path());
+    // check rows
+    ASSERT_EQ(1, tablet2->num_rows());
+    // tablet2 should not equal to tablet
+    ASSERT_NE(tablet2, tablet);
+
+    // test case 2
+    // migrate tablet2 back to the tablet's path
+    // sleep 1 second for update time
+    sleep(1);
+    dest_store = 
StorageEngine::instance()->get_store(tablet->data_dir()->path());
+    ASSERT_NE(dest_store, nullptr);
+    ASSERT_NE(dest_store->path(), tablet2->data_dir()->path());
+    std::cout << "dest store:" << dest_store->path() << std::endl;
+    EngineStorageMigrationTask engine_task2(tablet2, dest_store);
+    res = engine_task2.execute();
+    ASSERT_EQ(OLAP_SUCCESS, res);
+    TabletSharedPtr tablet3 = 
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    // check path
+    ASSERT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path());
+    // check rows
+    ASSERT_EQ(1, tablet3->num_rows());
+    // orgi_tablet should not equal to new_tablet and tablet
+    ASSERT_NE(tablet3, tablet2);
+    ASSERT_NE(tablet3, tablet);
+    // test case 2 end
+
+    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    ASSERT_EQ(OLAP_SUCCESS, res);
+    delete delta_writer;
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+    if (!doris::config::init(conffile.c_str(), false)) {
+        fprintf(stderr, "error read config file. \n");
+        return -1;
+    }
+    int ret = doris::OLAP_SUCCESS;
+    testing::InitGoogleTest(&argc, argv);
+    doris::CpuInfo::init();
+    doris::set_up();
+    ret = RUN_ALL_TESTS();
+    doris::tear_down();
+    google::protobuf::ShutdownProtobufLibrary();
+    return ret;
+}
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 896805f..b96c51e 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -578,10 +578,12 @@ module.exports = [
             directoryPath: "Administration/",
             initialOpenGroupIndex: -1,
             children: [
+              "ADMIN CANCEL REBALANCE DISK",
               "ADMIN CANCEL REPAIR",
               "ADMIN CLEAN TRASH",
               "ADMIN CHECK TABLET",
               "ADMIN COMPACT",
+              "ADMIN REBALANCE DISK",
               "ADMIN REPAIR",
               "ADMIN SET CONFIG",
               "ADMIN SET REPLICA STATUS",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 30c5fc2..fc7c6bd 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -591,10 +591,12 @@ module.exports = [
             directoryPath: "Administration/",
             initialOpenGroupIndex: -1,
             children: [
+              "ADMIN CANCEL REBALANCE DISK",
               "ADMIN CANCEL REPAIR",
               "ADMIN CLEAN TRASH",
               "ADMIN CHECK TABLET",
               "ADMIN COMPACT",
+              "ADMIN REBALANCE DISK",
               "ADMIN REPAIR",
               "ADMIN SET CONFIG",
               "ADMIN SET REPLICA STATUS",
diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL 
REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN 
CANCEL REBALANCE DISK.md
new file mode 100644
index 0000000..475e266
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL 
REBALANCE DISK.md        
@@ -0,0 +1,51 @@
+---
+{
+    "title": "ADMIN CANCEL REBALANCE DISK",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN CANCEL REBALANCE DISK
+## Description
+
+This statement is used to cancel rebalancing disks of specified backends with 
high priority
+
+Grammar:
+
+ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", 
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+Explain:
+
+1. This statement only indicates that the system no longer rebalance disks of 
specified backends with high priority. The system will still rebalance disks by 
default scheduling.
+
+## example
+
+1. Cancel High Priority Disk Rebalance of all of backends of the cluster
+
+ADMIN CANCEL REBALANCE DISK;
+
+2. Cancel High Priority Disk Rebalance of specified backends
+
+ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+
+## keyword
+ADMIN,CANCEL,REBALANCE DISK
diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN 
REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN 
REBALANCE DISK.md
new file mode 100644
index 0000000..6e1c1aa
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE 
DISK.md       
@@ -0,0 +1,52 @@
+---
+{
+    "title": "ADMIN REBALANCE DISK",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN REBALANCE DISK
+## Description
+
+This statement is used to try to rebalance disks of the specified backends 
first, no matter if the cluster is balanced
+
+Grammar:
+
+ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", 
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+Explain:
+
+1. This statement only means that the system attempts to rebalance disks of 
specified backends with high priority, no matter if the cluster is balanced.
+2. The default timeout is 24 hours. Timeout means that the system will no 
longer rebalance disks of specified backends with high priority. The command 
settings need to be reused.
+
+## example
+
+1. Attempt to rebalance disks of all backends
+
+ADMIN REBALANCE DISK;
+
+2. Attempt to rebalance disks oof the specified backends
+
+ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+
+## keyword
+ADMIN,REBALANCE,DISK
diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN 
CANCEL REBALANCE DISK.md 
b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE 
DISK.md
new file mode 100644
index 0000000..e697810
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL 
REBALANCE DISK.md     
@@ -0,0 +1,52 @@
+---
+{
+    "title": "ADMIN CANCEL REBALANCE DISK",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN CANCEL REBALANCE DISK
+## description
+
+    该语句用于取消优先均衡BE的磁盘
+
+    语法:
+
+        ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", 
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+    说明:
+
+        1. 该语句仅表示系统不再优先均衡指定BE的磁盘数据。系统仍会以默认调度方式均衡BE的磁盘数据。
+        
+## example
+
+    1. 取消集群所有BE的优先磁盘均衡
+
+        ADMIN CANCEL REBALANCE DISK;
+
+    2. 取消指定BE的优先磁盘均衡
+
+        ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", 
"192.168.1.2:1234");
+
+## keyword
+    ADMIN,CANCEL,REBALANCE,DISK
+
diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN 
REBALANCE DISK.md 
b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md
new file mode 100644
index 0000000..0bb78f5
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE 
DISK.md    
@@ -0,0 +1,54 @@
+---
+{
+    "title": "ADMIN REBALANCE DISK",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN REBALANCE DISK
+## description
+
+    该语句用于尝试优先均衡指定的BE磁盘数据
+
+    语法:
+
+        ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", 
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+    说明:
+
+        1. 该语句表示让系统尝试优先均衡指定BE的磁盘数据,不受限于集群是否均衡。
+        2. 默认的 timeout 是 24小时。超时意味着系统将不再优先均衡指定的BE磁盘数据。需要重新使用该命令设置。
+       3. 指定BE的磁盘数据均衡后,该BE的优先级将会失效。
+
+## example
+
+    1. 尝试优先均衡集群内的所有BE
+
+        ADMIN REBALANCE DISK;
+
+    2. 尝试优先均衡指定BE
+
+        ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+        
+## keyword
+    ADMIN,REBALANCE,DISK
+
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index da30388..2bf9da4 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -241,7 +241,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, 
KW_ALIAS, KW_ALL, KW_A
     KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, 
KW_COMMITTED, KW_COMPACT,
     KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, 
KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, 
KW_CURRENT_USER,
     KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, 
KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
-    KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, 
KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, 
KW_DROP, KW_DROPP, KW_DUPLICATE,
+    KW_DELETE, KW_UPDATE, KW_DISK, KW_DISTINCT, KW_DISTINCTPC, 
KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, 
KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
     KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, 
KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE,
     KW_EXISTS, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, KW_EXTRACT,
     KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, 
KW_FIELDS, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, 
KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
@@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, 
KW_ALIAS, KW_ALL, KW_A
     KW_PLUGIN, KW_PLUGINS,
     KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, 
KW_PROPERTY,
     KW_QUERY, KW_QUOTA,
-    KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP, 
KW_RELEASE, KW_RENAME,
+    KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, 
KW_REGEXP, KW_RELEASE, KW_RENAME,
     KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, 
KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, 
KW_RETURNS, KW_RESUME, KW_REVOKE,
     KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, 
KW_ROWS,
     KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, 
KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED,
@@ -5297,6 +5297,22 @@ admin_stmt ::=
     {:
         RESULT = new AdminCheckTabletsStmt(tabletIds, properties);
     :}
+    | KW_ADMIN KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN
+    {:
+        RESULT = new AdminRebalanceDiskStmt(backends);
+    :}
+    | KW_ADMIN KW_REBALANCE KW_DISK
+    {:
+        RESULT = new AdminRebalanceDiskStmt(null);
+    :}
+    | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK KW_ON LPAREN 
string_list:backends RPAREN
+    {:
+        RESULT = new AdminCancelRebalanceDiskStmt(backends);
+    :}
+    | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK
+    {:
+        RESULT = new AdminCancelRebalanceDiskStmt(null);
+    :}
     | KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN
     {:
         RESULT = new AdminCleanTrashStmt(backends);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
new file mode 100644
index 0000000..3e9bab3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AdminCancelRebalanceDiskStmt extends DdlStmt {
+    private List<Backend> backends = Lists.newArrayList();
+
+    public AdminCancelRebalanceDiskStmt(List<String> backends) {
+        ImmutableMap<Long, Backend> backendsInfo = 
Catalog.getCurrentSystemInfo().getIdToBackend();
+        Map<String, Long> backendsID = new HashMap<String, Long>();
+        for (Backend backend : backendsInfo.values()) {
+            backendsID.put(String.valueOf(backend.getHost()) + ":" + 
String.valueOf(backend.getHeartbeatPort()), backend.getId());
+        }
+        if (backends == null) {
+            for (Backend backend : backendsInfo.values()) {
+                this.backends.add(backend);
+            }
+        } else {
+            for (String backend : backends) {
+                if (backendsID.get(backend) != null) {
+                    
this.backends.add(backendsInfo.get(backendsID.get(backend)));
+                    backendsID.remove(backend); // avoid repetition
+                }
+            }
+        }
+    }
+
+    public List<Backend> getBackends() {
+        return backends;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+        if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.NO_FORWARD;
+    }
+}
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
new file mode 100644
index 0000000..c8f0aa6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AdminRebalanceDiskStmt extends DdlStmt {
+    private List<Backend> backends = Lists.newArrayList();
+    private long timeoutS = 0;
+
+    public AdminRebalanceDiskStmt(List<String> backends) {
+        ImmutableMap<Long, Backend> backendsInfo = 
Catalog.getCurrentSystemInfo().getIdToBackend();
+        Map<String, Long> backendsID = new HashMap<String, Long>();
+        for (Backend backend : backendsInfo.values()) {
+            backendsID.put(String.valueOf(backend.getHost()) + ":" + 
String.valueOf(backend.getHeartbeatPort()), backend.getId());
+        }
+        if (backends == null) {
+            for (Backend backend : backendsInfo.values()) {
+                this.backends.add(backend);
+            }
+        } else {
+            for (String backend : backends) {
+                if (backendsID.get(backend) != null) {
+                    
this.backends.add(backendsInfo.get(backendsID.get(backend)));
+                    backendsID.remove(backend); // avoid repetition
+                }
+            }
+        }
+        timeoutS = 24 * 3600; // default 24 hours
+    }
+
+    public List<Backend> getBackends() {
+        return backends;
+    }
+
+    public long getTimeoutS() {
+        return timeoutS;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+        if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.NO_FORWARD;
+    }
+}
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 8c575d8..f01c017 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -246,8 +246,8 @@ public class BackendLoadStatistic {
             }
         }
 
-        LOG.debug("classify path by load. storage: {} avg used percent: {}. 
low/mid/high: {}/{}/{}",
-                avgUsedPercent, medium, lowCounter, midCounter, highCounter);
+        LOG.debug("classify path by load. be id: {} storage: {} avg used 
percent: {}. low/mid/high: {}/{}/{}",
+                beId, medium, avgUsedPercent, lowCounter, midCounter, 
highCounter);
     }
 
     public void calcScore(Map<TStorageMedium, Double> 
avgClusterUsedCapacityPercentMap,
@@ -315,6 +315,60 @@ public class BackendLoadStatistic {
         return status;
     }
 
+     /*
+     * Check whether the backend can be more balance if we migrate a tablet 
with size 'tabletSize' from
+     * `srcPath` to 'destPath'
+     * 1. recalculate the load score of src and dest path after migrate the 
tablet.
+     * 2. if the summary of the diff between the new score and average score 
becomes smaller, we consider it
+     *    as more balance.
+     */
+    public boolean isMoreBalanced(long srcPath, long destPath, long tabletId, 
long tabletSize,
+                                  TStorageMedium medium) {
+        long totalCapacity = 0;
+        long totalUsedCapacity = 0;
+        RootPathLoadStatistic srcPathStat = null;
+        RootPathLoadStatistic destPathStat = null;
+        for (RootPathLoadStatistic pathStat : pathStatistics) {
+            if (pathStat.getStorageMedium() == medium) {
+                totalCapacity += pathStat.getCapacityB();
+                totalUsedCapacity += pathStat.getUsedCapacityB();
+                if (pathStat.getPathHash() == srcPath) {
+                    srcPathStat = pathStat;
+                } else if (pathStat.getPathHash() == destPath) {
+                    destPathStat = pathStat;
+                }
+            }
+        }
+        if (srcPathStat == null || destPathStat == null) {
+            LOG.info("migrate {}(size: {}) from {} to {} failed, medium: {}, 
src or dest path stat does not exist.",
+                    tabletId, tabletSize, srcPath, destPath, medium);
+            return false;
+        }
+        double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity / 
(double) totalCapacity;
+        double currentSrcPathScore = srcPathStat.getCapacityB() == 0
+            ? 0.0 : srcPathStat.getUsedCapacityB() / (double) 
srcPathStat.getCapacityB();
+        double currentDestPathScore = destPathStat.getCapacityB() == 0
+            ? 0.0 : destPathStat.getUsedCapacityB() / (double) 
destPathStat.getCapacityB();
+
+        double newSrcPathScore = srcPathStat.getCapacityB() == 0
+            ? 0.0 : (srcPathStat.getUsedCapacityB() - tabletSize) / (double) 
srcPathStat.getCapacityB();
+        double newDestPathScore = destPathStat.getCapacityB() == 0
+            ? 0.0 : (destPathStat.getUsedCapacityB() + tabletSize) / (double) 
destPathStat.getCapacityB();
+
+        double currentDiff = Math.abs(currentSrcPathScore - avgUsedPercent)
+            + Math.abs(currentDestPathScore - avgUsedPercent);
+        double newDiff = Math.abs(newSrcPathScore - avgUsedPercent) + 
Math.abs(newDestPathScore - avgUsedPercent);
+
+        LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the 
load score changed."
+                        + " src: {} -> {}, dest: {}->{}, average score: {}. 
current diff: {}, new diff: {},"
+                        + " more balanced: {}",
+                tabletId, tabletSize, srcPath, destPath, medium, 
currentSrcPathScore, newSrcPathScore,
+                currentDestPathScore, newDestPathScore, avgUsedPercent, 
currentDiff, newDiff,
+                (newDiff < currentDiff));
+
+        return newDiff < currentDiff;
+    }
+
     public boolean hasAvailDisk() {
         for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) {
             if (rootPathLoadStatistic.getDiskState() == DiskState.ONLINE) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
new file mode 100644
index 0000000..f32e31a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletSchedCtx.BalanceType;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/*
+
+ * This DiskBalancer is different from other Balancers which takes care of 
cluster-wide data balancing.
+ * This DiskBalancer chooses a backend and moves tablet from one disk to 
another.
+ * DiskRebalancer strategy:
+ * 1. only works while the cluster is balanced(which means the cluster has no 
high and mid load backends)
+ * 1.1 if user has given prio backends, then select tablets from prio backends 
no matter cluster is balanced or not.
+ * 2. selecting alternative tablets from mid load backends, and return them to 
tablet scheduler.
+ * 3. given a tablet which has src path(disk), find a path(disk) to migration.
+ */
+public class DiskRebalancer extends Rebalancer {
+    private static final Logger LOG = 
LogManager.getLogger(DiskRebalancer.class);
+
+    public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex) {
+        super(infoService, invertedIndex);
+    }
+
+    public List<BackendLoadStatistic> 
filterByPrioBackends(List<BackendLoadStatistic> bes) {
+        List<BackendLoadStatistic> stats = Lists.newArrayList();
+        for (BackendLoadStatistic backend : bes) {
+            long backendId = backend.getBeId();
+            Long timeoutS = prioBackends.getOrDefault(backendId, 0L);
+            if (timeoutS != 0) {
+                if (timeoutS > System.currentTimeMillis()) {
+                    // remove backends from prio if timeout
+                    prioBackends.remove(backendId);
+                    continue;
+                }
+                stats.add(backend);
+            }
+        }
+        return stats;
+    }
+
+    // true means BE has low and high paths for balance after reclassification
+    private boolean checkAndReclassifyPaths(Set<Long> pathLow, Set<Long> 
pathMid, Set<Long> pathHigh) {
+        if (pathLow.isEmpty() && pathHigh.isEmpty()) {
+            // balanced
+            return false;
+        }
+        if (pathLow.isEmpty()) {
+            // mid => low
+            pathLow.addAll(pathMid);
+        } else if (pathHigh.isEmpty()) {
+            // mid => high
+            pathHigh.addAll(pathMid);
+        }
+        if (pathLow.isEmpty() || pathHigh.isEmpty()) {
+            // check again
+            return false;
+        }
+        return true;
+    }
+
+    /*
+     * Try to select alternative tablets to balance the disks.
+     * 1. Classify the backend into low, mid and high class by load score.
+     * 2. Try to select tablets from mid load backends.
+     *      1. Here we only select alternative tablets, without considering 
selected tablets' status,
+     *         and whether it is benefit for balance (All these will be 
checked in tablet scheduler)
+     *      2. Only select tablets from 'mid' backends.
+     *      3. Only select tablets from 'high' paths.
+     * 3. Try to select tablets from prio backends.
+     *
+     * Here we only select tablets from mid load node, do not set its dest, 
all this will be set
+     * when this tablet is being scheduled in tablet scheduler.
+     *
+     * NOTICE that we may select any available tablets here, ignore their 
state.
+     * The state will be checked when being scheduled in tablet scheduler.
+     */
+    @Override
+    protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
+            ClusterLoadStatistic clusterStat, TStorageMedium medium) {
+        String clusterName = clusterStat.getClusterName();
+        List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
+
+        // get classification of backends
+        List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
+        List<BackendLoadStatistic> midBEs = Lists.newArrayList();
+        List<BackendLoadStatistic> highBEs = Lists.newArrayList();
+        clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, 
medium);
+
+        if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
+            // the cluster is not balanced
+            if (prioBackends.isEmpty()) {
+                LOG.info("cluster is not balanced: {} with medium: {}. skip", 
clusterName, medium);
+                return alternativeTablets;
+            } else {
+                // prioBEs are not empty, we only schedule prioBEs' disk 
balance task
+                midBEs.addAll(lowBEs);
+                midBEs.addAll(highBEs);
+                midBEs = filterByPrioBackends(midBEs);
+            }
+        }
+
+        // first we should check if mid backends is available.
+        // if all mid backends is not available, we should not start balance
+        if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
+            LOG.info("all mid load backends is dead: {} with medium: {}. skip",
+                    
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+            return alternativeTablets;
+        }
+
+        if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
+            LOG.info("all mid load backends {} have no available disk with 
medium: {}. skip",
+                    
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+            return alternativeTablets;
+        }
+
+        Set<Long> unbalancedBEs = Sets.newHashSet();
+        // choose tablets from backends randomly.
+        Collections.shuffle(midBEs);
+        for (int i = midBEs.size() - 1; i >= 0; i--) {
+            BackendLoadStatistic beStat = midBEs.get(i);
+
+            // classify the paths.
+            Set<Long> pathLow = Sets.newHashSet();
+            Set<Long> pathMid = Sets.newHashSet();
+            Set<Long> pathHigh = Sets.newHashSet();
+            // we only select tablets from available high load path
+            beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
+            // check if BE has low and high paths for balance after 
reclassification
+            if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
+                continue;
+            }
+
+            // get all tablets on this backend, and shuffle them for random 
selection
+            List<Long> tabletIds = 
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
+            Collections.shuffle(tabletIds);
+
+            // for each path, we try to select at most 
BALANCE_SLOT_NUM_FOR_PATH tablets
+            Map<Long, Integer> remainingPaths = Maps.newHashMap();
+            for (Long pathHash : pathHigh) {
+                remainingPaths.put(pathHash, 
TabletScheduler.BALANCE_SLOT_NUM_FOR_PATH);
+            }
+
+            if (remainingPaths.isEmpty()) {
+                return alternativeTablets;
+            }
+
+            // select tablet from shuffled tablets
+            for (Long tabletId : tabletIds) {
+                Replica replica = invertedIndex.getReplica(tabletId, 
beStat.getBeId());
+                if (replica == null) {
+                    continue;
+                }
+                // ignore empty replicas as they do not make disk more 
balance. (disk usage)
+                if (replica.getDataSize() == 0) {
+                    continue;
+                }
+
+                // check if replica's is on 'high' path.
+                // and only select it if the selected tablets num of this path
+                // does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH).
+                long replicaPathHash = replica.getPathHash();
+                if (remainingPaths.containsKey(replicaPathHash)) {
+                    TabletMeta tabletMeta = 
invertedIndex.getTabletMeta(tabletId);
+                    if (tabletMeta == null) {
+                        continue;
+                    }
+
+                    TabletSchedCtx tabletCtx = new 
TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
+                            tabletMeta.getDbId(), tabletMeta.getTableId(), 
tabletMeta.getPartitionId(),
+                            tabletMeta.getIndexId(), tabletId, null /* replica 
alloc is not used for balance*/,
+                            System.currentTimeMillis());
+                    // we set temp src here to simplify completeSchedCtx 
method, and avoid take slot here
+                    tabletCtx.setTempSrc(replica);
+                    tabletCtx.setTag(clusterStat.getTag());
+                    if (prioBackends.containsKey(beStat.getBeId())) {
+                        // priority of balance task of prio BE is NORMAL 
+                        tabletCtx.setOrigPriority(Priority.NORMAL);
+                    } else {
+                        // balance task's default priority is LOW
+                        tabletCtx.setOrigPriority(Priority.LOW);
+                    }
+                    // we must set balanceType to DISK_BALANCE for create 
migration task
+                    tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
+
+                    alternativeTablets.add(tabletCtx);
+                    unbalancedBEs.add(beStat.getBeId());
+                    // update remaining paths
+                    int remaining = remainingPaths.get(replicaPathHash) - 1;
+                    if (remaining <= 0) {
+                        remainingPaths.remove(replicaPathHash);
+                    } else {
+                        remainingPaths.put(replicaPathHash, remaining);
+                    }
+                }
+            }
+        } // end for mid backends
+
+        // remove balanced BEs from prio backends
+        prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id));
+        LOG.info("select alternative tablets for cluster: {}, medium: {}, num: 
{}, detail: {}",
+                clusterName, medium, alternativeTablets.size(),
+                
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        return alternativeTablets;
+    }
+
+    /*
+     * Create a StorageMediaMigrationTask of this selected tablet for balance.
+     * 1. Check if the cluster is balanced. if not, the balance will be 
cancelled.
+     * 2. Check if the src replica still on high load path. If not, the 
balance will be cancelled.
+     * 3. Select a low load path from this backend as destination.
+     */
+    @Override
+    public void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> 
backendsWorkingSlots) throws SchedException {
+        ClusterLoadStatistic clusterStat = 
statisticMap.get(tabletCtx.getCluster(), tabletCtx.getTag());
+        if (clusterStat == null) {
+            throw new SchedException(Status.UNRECOVERABLE, "cluster does not 
exist");
+        }
+        if (tabletCtx.getTempSrcBackendId() == -1 || 
tabletCtx.getTempSrcPathHash() == -1) {
+            throw new SchedException(Status.UNRECOVERABLE,
+                "src does not appear to be set correctly, something goes 
wrong");
+        }
+        Replica replica = invertedIndex.getReplica(tabletCtx.getTabletId(), 
tabletCtx.getTempSrcBackendId());
+        // check src replica still there
+        if (replica == null || replica.getPathHash() != 
tabletCtx.getTempSrcPathHash()) {
+            throw new SchedException(Status.UNRECOVERABLE, "src replica may be 
rebalanced");
+        }
+        // ignore empty replicas as they do not make disk more balance
+        if (replica.getDataSize() == 0) {
+            throw new SchedException(Status.UNRECOVERABLE, "size of src 
replica is zero");
+        }
+        // check src slot
+        PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
+        if (slot == null) {
+            LOG.debug("BE does not have slot: {}", replica.getBackendId());
+            throw new SchedException(Status.UNRECOVERABLE, "unable to take src 
slot");
+        }
+        long pathHash = slot.takeBalanceSlot(replica.getPathHash());
+        if (pathHash == -1) {
+            throw new SchedException(Status.UNRECOVERABLE, "unable to take src 
slot");
+        }
+        // after take src slot, we can set src replica now
+        tabletCtx.setSrc(replica);
+
+        BackendLoadStatistic beStat = 
clusterStat.getBackendLoadStatistic(replica.getBackendId());
+        if (!beStat.isAvailable()) {
+            throw new SchedException(Status.UNRECOVERABLE, "the backend is not 
available");
+        }
+        // classify the paths.
+        // If src path is 'high', then we can select path from 'low' and 'mid'
+        // If src path is 'mid', then we can only select path from 'low'
+        // If src path is 'low', then we have nothing to do
+        Set<Long> pathLow = Sets.newHashSet();
+        Set<Long> pathMid = Sets.newHashSet();
+        Set<Long> pathHigh = Sets.newHashSet();
+        beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
tabletCtx.getStorageMedium());
+        if (pathHigh.contains(replica.getPathHash())) {
+            pathLow.addAll(pathMid);
+        } else if (!pathMid.contains(replica.getPathHash())) {
+            throw new SchedException(Status.UNRECOVERABLE, "src path is low 
load");
+        }
+        // check if this migration task can make the be's disks more balance.
+        List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
+        BalanceStatus bs;
+        if ((bs = beStat.isFit(tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium(), availPaths,
+                false /* not supplement */)) != BalanceStatus.OK) {
+            LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), 
bs.getErrMsgs());
+            throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in 
BE");
+        }
+        // Select a low load path as destination.
+        boolean setDest = false;
+        for (RootPathLoadStatistic stat : availPaths) {
+            // check if avail path is src path
+            if (stat.getPathHash() == replica.getPathHash()) {
+                continue;
+            }
+            // check if avail path is low path
+            if (!pathLow.contains(stat.getPathHash())) {
+                LOG.debug("the path :{} is not low load", stat.getPathHash());
+                continue;
+            }
+            if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(), 
stat.getPathHash(),
+                    tabletCtx.getTabletId(), tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium())) {
+                LOG.debug("the path :{} can not make more balance", 
stat.getPathHash());
+                continue;
+            }
+            long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
+            if (destPathHash == -1) {
+                throw new SchedException(Status.UNRECOVERABLE, "unable to take 
dest slot");
+            }
+            tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
+            setDest = true;
+            break;
+        }
+
+        if (!setDest) {
+            throw new SchedException(Status.UNRECOVERABLE, "unable to find low 
load path");
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 1fca406..a7177c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -20,11 +20,13 @@ package org.apache.doris.clone;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
 import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
 
@@ -50,6 +52,8 @@ public abstract class Rebalancer {
     protected Table<String, Tag, ClusterLoadStatistic> statisticMap = 
HashBasedTable.create();
     protected TabletInvertedIndex invertedIndex;
     protected SystemInfoService infoService;
+    // be id -> end time of prio
+    protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
 
     public Rebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex) {
         this.infoService = infoService;
@@ -71,10 +75,14 @@ public abstract class Rebalancer {
     protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             ClusterLoadStatistic clusterStat, TStorageMedium medium);
 
-    public void createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, 
PathSlot> backendsWorkingSlots,
-                                  AgentBatchTask batchTask) throws 
SchedException {
+    public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, 
PathSlot> backendsWorkingSlots)
+            throws SchedException {
         completeSchedCtx(tabletCtx, backendsWorkingSlots);
-        batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+        if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.BE_BALANCE) {
+            return tabletCtx.createCloneReplicaAndTask();
+        } else {
+            return tabletCtx.createStorageMediaMigrationTask();
+        }
     }
 
     // Before createCloneReplicaAndTask, we need to complete the 
TabletSchedCtx.
@@ -93,4 +101,21 @@ public abstract class Rebalancer {
     public void updateLoadStatistic(Table<String, Tag, ClusterLoadStatistic> 
statisticMap) {
         this.statisticMap = statisticMap;
     }
+
+    public void addPrioBackends(List<Backend> backends, long timeoutS) {
+        long currentTimeMillis = System.currentTimeMillis();
+        for (Backend backend : backends) {
+            prioBackends.put(backend.getId(), currentTimeMillis + timeoutS);
+        }
+    }
+
+    public void removePrioBackends(List<Backend> backends) {
+        for (Backend backend : backends) {
+            prioBackends.remove(backend.getId());
+        }
+    }
+
+    public boolean hasPrioBackends() {
+        return !prioBackends.isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 6610b48..60e8080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -40,6 +40,7 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CloneTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.thrift.TBackend;
 import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TStatusCode;
@@ -108,6 +109,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         BALANCE, REPAIR
     }
 
+    public enum BalanceType {
+        BE_BALANCE, DISK_BALANCE 
+    }
+
     public enum Priority {
         LOW,
         NORMAL,
@@ -141,6 +146,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     }
     
     private Type type;
+    private BalanceType balanceType;
 
     /*
      * origPriority is the origin priority being set when this tablet being 
added to scheduler.
@@ -193,11 +199,16 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     
     private Replica srcReplica = null;
     private long srcPathHash = -1;
+    // for disk balance to keep src path, and avoid take slot on 
selectAlternativeTabletsForCluster
+    private Replica tempSrcReplica = null;
     private long destBackendId = -1;
     private long destPathHash = -1;
+    // for disk balance to set migration task's datadir
+    private String destPath = null;
     private String errMsg = null;
     
     private CloneTask cloneTask = null;
+    private StorageMediaMigrationTask storageMediaMigrationTask = null;
     
     // statistics gathered from clone task report
     // the total size of clone files and the total cost time in ms.
@@ -227,6 +238,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         this.infoService = Catalog.getCurrentSystemInfo();
         this.state = State.PENDING;
         this.replicaAlloc = replicaAlloc;
+        this.balanceType = BalanceType.BE_BALANCE;
     }
 
     public ReplicaAllocation getReplicaAlloc() {
@@ -249,6 +261,14 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         return type;
     }
 
+    public void setBalanceType(BalanceType type) {
+        this.balanceType = type;
+    }
+
+    public BalanceType getBalanceType() {
+        return balanceType;
+    }
+
     public Priority getOrigPriority() {
         return origPriority;
     }
@@ -380,6 +400,11 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         this.destBackendId = destBeId;
         this.destPathHash = destPathHash;
     }
+
+    public void setDest(Long destBeId, long destPathHash, String destPath) {
+        setDest(destBeId, destPathHash);
+        this.destPath = destPath;
+    }
     
     public void setErrMsg(String errMsg) {
         this.errMsg = errMsg;
@@ -414,6 +439,24 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         this.srcPathHash = srcReplica.getPathHash();
     }
 
+    public void setTempSrc(Replica srcReplica) {
+        this.tempSrcReplica = srcReplica;
+    }
+
+    public long getTempSrcBackendId() {
+        if (tempSrcReplica != null) {
+            return tempSrcReplica.getBackendId();
+        }
+        return -1;
+    }
+
+    public long getTempSrcPathHash() {
+        if (tempSrcReplica != null) {
+            return tempSrcReplica.getPathHash();
+        }
+        return -1;
+    }
+
     public long getDestBackendId() {
         return destBackendId;
     }
@@ -422,6 +465,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         return destPathHash;
     }
 
+    public String getDestPath() {
+        return destPath;
+    }
+
     // database lock should be held.
     public long getTabletSize() {
         long max = Long.MIN_VALUE;
@@ -687,6 +734,9 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             }
         }
         
+        if (storageMediaMigrationTask != null) {
+            
AgentTaskQueue.removeTask(storageMediaMigrationTask.getBackendId(), 
TTaskType.STORAGE_MEDIUM_MIGRATE, storageMediaMigrationTask.getSignature());
+        }
         if (cloneTask != null) {
             AgentTaskQueue.removeTask(cloneTask.getBackendId(), 
TTaskType.CLONE, cloneTask.getSignature());
 
@@ -729,13 +779,28 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             this.srcPathHash = -1;
             this.destBackendId = -1;
             this.destPathHash = -1;
+            this.destPath = null;
             this.cloneTask = null;
+            this.storageMediaMigrationTask = null;
         }
     }
     
     public void deleteReplica(Replica replica) {
         tablet.deleteReplicaByBackendId(replica.getBackendId());
     }
+
+    public StorageMediaMigrationTask createStorageMediaMigrationTask() throws 
SchedException {
+        storageMediaMigrationTask = new 
StorageMediaMigrationTask(getSrcBackendId(), getTabletId(),
+                getSchemaHash(), getStorageMedium());
+        if (destPath == null || destPath.isEmpty()) {
+            throw new SchedException(Status.UNRECOVERABLE,
+                "backend " + srcReplica.getBackendId() + ", dest path is 
empty");
+        }
+        storageMediaMigrationTask.setDataDir(destPath);
+        this.taskTimeoutMs = getApproximateTimeoutMs();
+        this.state = State.RUNNING;
+        return storageMediaMigrationTask;
+    }
     
     // database lock should be held.
     public CloneTask createCloneReplicaAndTask() throws SchedException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1f80cad..68ab63a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.clone;
 
+import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
+import org.apache.doris.analysis.AdminRebalanceDiskStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@@ -51,7 +53,9 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CloneTask;
 import org.apache.doris.task.DropReplicaTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 
@@ -101,13 +105,14 @@ public class TabletScheduler extends MasterDaemon {
 
     private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
 
-    public static final int BALANCE_SLOT_NUM_FOR_PATH = 2;
+    // 1 slot for reduce unnecessary balance task, provided a more accurate 
estimate of capacity
+    public static final int BALANCE_SLOT_NUM_FOR_PATH = 1; 
 
     /*
      * Tablet is added to pendingTablets as well it's id in allTabletIds.
      * TabletScheduler will take tablet from pendingTablets but will not 
remove it's id from allTabletIds when
      * handling a tablet.
-     * Tablet' id can only be removed after the clone task is done(timeout, 
cancelled or finished).
+     * Tablet' id can only be removed after the clone task or migration task 
is done(timeout, cancelled or finished).
      * So if a tablet's id is still in allTabletIds, TabletChecker can not add 
tablet to TabletScheduler.
      *
      * pendingTablets + runningTablets = allTabletIds
@@ -135,6 +140,7 @@ public class TabletScheduler extends MasterDaemon {
     private ColocateTableIndex colocateTableIndex;
     private TabletSchedulerStat stat;
     private Rebalancer rebalancer;
+    private Rebalancer diskRebalancer;
 
     // result of adding a tablet to pendingTablets
     public enum AddResult {
@@ -157,6 +163,8 @@ public class TabletScheduler extends MasterDaemon {
         } else {
             this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
         }
+        // if rebalancer can not get new task, then use diskRebalancer to get 
task
+        this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
     }
 
     public TabletSchedulerStat getStat() {
@@ -244,6 +252,14 @@ public class TabletScheduler extends MasterDaemon {
         return allTabletIds.contains(tabletId);
     }
 
+    public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
+        diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS());
+    }
+
+    public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt 
stmt) {
+        diskRebalancer.removePrioBackends(stmt.getBackends());
+    }
+
     /**
      * Iterate current tablets, change their priority to VERY_HIGH if 
necessary.
      */
@@ -300,6 +316,7 @@ public class TabletScheduler extends MasterDaemon {
 
         updateClusterLoadStatistic();
         rebalancer.updateLoadStatistic(statisticMap);
+        diskRebalancer.updateLoadStatistic(statisticMap);
 
         adjustPriorities();
 
@@ -463,7 +480,6 @@ public class TabletScheduler extends MasterDaemon {
      * Try to schedule a single tablet.
      */
     private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask 
batchTask) throws SchedException {
-        LOG.debug("schedule tablet: {}, type: {}, status: {}", 
tabletCtx.getTabletId(), tabletCtx.getType(), tabletCtx.getTabletStatus());
         long currentTime = System.currentTimeMillis();
         tabletCtx.setLastSchedTime(currentTime);
         tabletCtx.setLastVisitedTime(currentTime);
@@ -561,6 +577,11 @@ public class TabletScheduler extends MasterDaemon {
                 throw new SchedException(Status.UNRECOVERABLE, "tablet is 
unhealthy when doing balance");
             }
 
+            // for disk balance more accutely, we only schedule tablet when 
has lastly stat info about disk
+            if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE &&
+                   tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+                checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(), 
tabletCtx.getTempSrcPathHash());
+            }
             // we do not concern priority here.
             // once we take the tablet out of priority queue, priority is 
meaningless.
             tabletCtx.setTablet(tablet);
@@ -574,6 +595,25 @@ public class TabletScheduler extends MasterDaemon {
         }
     }
 
+    private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws 
SchedException {
+        PathSlot pathSlot = backendsWorkingSlots.get(beId);
+        if (pathSlot == null) {
+            throw new SchedException(Status.UNRECOVERABLE, "path slot does not 
exist");
+        }
+        long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash);
+        if (succTime > lastStatUpdateTime) {
+            throw new SchedException(Status.UNRECOVERABLE, "stat info is 
outdated");
+        }
+    }
+
+    public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
+        PathSlot pathSlot = backendsWorkingSlots.get(beId);
+        if (pathSlot == null) {
+            return;
+        }
+        pathSlot.updateDiskBalanceLastSuccTime(pathHash);
+    }
+
     private void handleTabletByTypeAndStatus(TabletStatus status, 
TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
             throws SchedException {
         if (tabletCtx.getType() == Type.REPAIR) {
@@ -1189,6 +1229,21 @@ public class TabletScheduler extends MasterDaemon {
         for (TabletSchedCtx tabletCtx : alternativeTablets) {
             addTablet(tabletCtx, false);
         }
+        if (Config.disable_disk_balance) {
+            LOG.info("disk balance is disabled. skip selecting tablets for 
disk balance");
+            return;
+        }
+        List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList();
+        // if default rebalancer can not get new task or user given prio BEs, 
then use disk rebalancer to get task
+        if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) {
+            diskBalanceTablets = diskRebalancer.selectAlternativeTablets();
+        }
+        for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
+            // add if task from prio backend or cluster is balanced
+            if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == 
TabletSchedCtx.Priority.NORMAL) {
+                addTablet(tabletCtx, false);
+            }
+        }
     }
 
     /**
@@ -1196,7 +1251,18 @@ public class TabletScheduler extends MasterDaemon {
      */
     private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) 
throws SchedException {
         stat.counterBalanceSchedule.incrementAndGet();
-        rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, 
batchTask);
+        AgentTask task = null;
+        if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+            task = diskRebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+            checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), 
tabletCtx.getSrcPathHash());
+            checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), 
tabletCtx.getDestPathHash());
+        } else if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.BE_BALANCE) {
+            task = rebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+        } else {
+            throw new SchedException(Status.UNRECOVERABLE,
+                "unknown balance type: " + 
tabletCtx.getBalanceType().toString());
+        }
+        batchTask.addTask(task);
     }
 
     // choose a path on a backend which is fit for the tablet
@@ -1347,7 +1413,7 @@ public class TabletScheduler extends MasterDaemon {
     // get next batch of tablets from queue.
     private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
         List<TabletSchedCtx> list = Lists.newArrayList();
-        int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
+        int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
         while (count > 0) {
             TabletSchedCtx tablet = pendingTablets.poll();
             if (tablet == null) {
@@ -1368,6 +1434,29 @@ public class TabletScheduler extends MasterDaemon {
         return total;
     }
 
+    public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask 
migrationTask,
+                        TFinishTaskRequest request) {
+        long tabletId = migrationTask.getTabletId();
+        TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
+        if (tabletCtx == null) {
+            // tablet does not exist, the task may be created by 
ReportHandler.tabletReport(ssd => hdd)
+            LOG.warn("tablet info does not exist: {}", tabletId);
+            return true;
+        }
+        if (tabletCtx.getBalanceType() != 
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+            // this should not happen
+            LOG.warn("task type is not as excepted. tablet {}", tabletId);
+            return true;
+        }
+        if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) {
+            // if we have a success task, then stat must be refreshed before 
schedule a new task
+            updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), 
tabletCtx.getSrcPathHash());
+            updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), 
tabletCtx.getDestPathHash());
+        }
+        // we need this function to free slot for this migration task
+        finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, 
"finished");
+        return true;
+    }
     /**
      * return true if we want to remove the clone task from AgentTaskQueue
      */
@@ -1379,6 +1468,11 @@ public class TabletScheduler extends MasterDaemon {
             // tablet does not exist, no need to keep task.
             return true;
         }
+        if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+            // this should not happen
+            LOG.warn("task type is not as excepted. tablet {}", tabletId);
+            return true;
+        }
 
         Preconditions.checkState(tabletCtx.getState() == 
TabletSchedCtx.State.RUNNING, tabletCtx.getState());
         try {
@@ -1706,6 +1800,22 @@ public class TabletScheduler extends MasterDaemon {
             slot.balanceSlot++;
             slot.rectify();
         }
+
+        public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
+            Slot slot = pathSlots.get(pathHash);
+            if (slot == null) {
+                return;
+            }
+            slot.diskBalanceLastSuccTime = System.currentTimeMillis();
+        }
+
+        public synchronized long getDiskBalanceLastSuccTime(long pathHash) {
+            Slot slot = pathSlots.get(pathHash);
+            if (slot == null) {
+                return 0L;
+            }
+            return slot.diskBalanceLastSuccTime;
+        }
     }
 
     public List<List<String>> getSlotsInfo() {
@@ -1726,6 +1836,9 @@ public class TabletScheduler extends MasterDaemon {
         public long totalCopySize = 0;
         public long totalCopyTimeMs = 0;
 
+        // for disk balance
+        public long diskBalanceLastSuccTime = 0;
+
         public Slot(int total) {
             this.total = total;
             this.available = total;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index a9a8b9e..1551676 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1133,6 +1133,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_balance = false;
 
+    /**
+     * if set to true, TabletScheduler will not do disk balance.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean disable_disk_balance = false;
+
     // if the number of scheduled tablets in TabletScheduler exceed 
max_scheduling_tablets
     // skip checking.
     @ConfField(mutable = true, masterOnly = true)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 5230f9f..f0ae6a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -47,6 +47,7 @@ import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.task.UpdateTabletMetaInfoTask;
 import org.apache.doris.task.UploadTask;
 import org.apache.doris.thrift.TBackend;
@@ -115,8 +116,8 @@ public class MasterImpl {
         
         AgentTask task = AgentTaskQueue.getTask(backendId, taskType, 
signature);
         if (task == null) {
-            if (taskType != TTaskType.DROP && taskType != 
TTaskType.STORAGE_MEDIUM_MIGRATE
-                    && taskType != TTaskType.RELEASE_SNAPSHOT && taskType != 
TTaskType.CLEAR_TRANSACTION_TASK) {
+            if (taskType != TTaskType.DROP && taskType != 
TTaskType.RELEASE_SNAPSHOT
+                    && taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
                 String errMsg = "cannot find task. type: " + taskType + ", 
backendId: " + backendId
                         + ", signature: " + signature;
                 LOG.warn(errMsg);
@@ -137,7 +138,8 @@ public class MasterImpl {
                 if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != 
TTaskType.UPLOAD
                         && taskType != TTaskType.DOWNLOAD && taskType != 
TTaskType.MOVE
                         && taskType != TTaskType.CLONE && taskType != 
TTaskType.PUBLISH_VERSION
-                        && taskType != TTaskType.CREATE && taskType != 
TTaskType.UPDATE_TABLET_META_INFO) {
+                        && taskType != TTaskType.CREATE && taskType != 
TTaskType.UPDATE_TABLET_META_INFO
+                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
                     return result;
                 }
             }
@@ -175,6 +177,9 @@ public class MasterImpl {
                 case CLONE:
                     finishClone(task, request);
                     break;
+                case STORAGE_MEDIUM_MIGRATE:
+                    finishStorageMediumMigrate(task, request);
+                    break;
                 case CHECK_CONSISTENCY:
                     finishConsistencyCheck(task, request);
                     break;
@@ -699,6 +704,12 @@ public class MasterImpl {
         AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, 
task.getSignature());
     }
 
+    private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest 
request) {
+        StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) 
task;
+        
Catalog.getCurrentCatalog().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask,
 request);
+        AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature());
+    }
+
     private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest 
request) {
         CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask) 
task;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index ea4abd9..e1f1051 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -360,10 +360,12 @@ public class ReportHandler extends Daemon {
             // 1. CREATE
             // 2. SYNC DELETE
             // 3. CHECK_CONSISTENCY
+            // 4. STORAGE_MDEIUM_MIGRATE
             if (task.getTaskType() == TTaskType.CREATE
                     || (task.getTaskType() == TTaskType.PUSH && ((PushTask) 
task).getPushType() == TPushType.DELETE
                     && ((PushTask) task).isSyncDelete())
-                    || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) {
+                    || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
+                    || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) 
{
                 continue;
             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index c66349a..c35a27b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt;
 import org.apache.doris.analysis.AdminCheckTabletsStmt;
 import org.apache.doris.analysis.AdminCleanTrashStmt;
 import org.apache.doris.analysis.AdminCompactTableStmt;
+import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
+import org.apache.doris.analysis.AdminRebalanceDiskStmt;
 import org.apache.doris.analysis.AdminRepairTableStmt;
 import org.apache.doris.analysis.AdminSetConfigStmt;
 import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
@@ -281,6 +283,10 @@ public class DdlExecutor {
             catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
         } else if (ddlStmt instanceof AdminCleanTrashStmt) {
             catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt);
+        } else if (ddlStmt instanceof AdminRebalanceDiskStmt) {
+            
catalog.getTabletScheduler().rebalanceDisk((AdminRebalanceDiskStmt) ddlStmt);
+        } else if (ddlStmt instanceof AdminCancelRebalanceDiskStmt) {
+            
catalog.getTabletScheduler().cancelRebalanceDisk((AdminCancelRebalanceDiskStmt) 
ddlStmt);
         } else if (ddlStmt instanceof CreateSqlBlockRuleStmt) {
             
catalog.getSqlBlockRuleMgr().createSqlBlockRule((CreateSqlBlockRuleStmt) 
ddlStmt);
         } else if (ddlStmt instanceof AlterSqlBlockRuleStmt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
index 72aef5a..1ddbde7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
@@ -21,10 +21,14 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageMediumMigrateReq;
 import org.apache.doris.thrift.TTaskType;
 
+import com.google.common.base.Strings;
+
 public class StorageMediaMigrationTask extends AgentTask {
 
     private int schemaHash;
     private TStorageMedium toStorageMedium;
+    // if dataDir is specified, the toStorageMedium is meaning less
+    private String dataDir;
 
     public StorageMediaMigrationTask(long backendId, long tabletId, int 
schemaHash,
                                      TStorageMedium toStorageMedium) {
@@ -36,9 +40,20 @@ public class StorageMediaMigrationTask extends AgentTask {
 
     public TStorageMediumMigrateReq toThrift() {
         TStorageMediumMigrateReq request = new 
TStorageMediumMigrateReq(tabletId, schemaHash, toStorageMedium);
+        if (!Strings.isNullOrEmpty(dataDir)) {
+            request.setDataDir(dataDir);
+        }
         return request;
     }
 
+    public String getDataDir() {
+        return dataDir;
+    }
+
+    public void setDataDir(String dataDir) {
+        this.dataDir = dataDir;
+    }
+
     public int getSchemaHash() {
         return schemaHash;
     }
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index a2eeba9..7ab3c7e 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -171,6 +171,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("distinctpcsa", new 
Integer(SqlParserSymbols.KW_DISTINCTPCSA));
         keywordMap.put("distributed", new 
Integer(SqlParserSymbols.KW_DISTRIBUTED));
         keywordMap.put("distribution", new 
Integer(SqlParserSymbols.KW_DISTRIBUTION));
+        keywordMap.put("disk", new Integer(SqlParserSymbols.KW_DISK));
         keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC));
         keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
         keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
@@ -319,6 +320,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE));
         keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ));
         keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
+        keywordMap.put("rebalance", new 
Integer(SqlParserSymbols.KW_REBALANCE));
         keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER));
         keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
         keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
new file mode 100644
index 0000000..8e92567
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.clone.RebalancerTestUtil;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.mysql.privilege.MockedAuth;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import mockit.Mocked;
+
+public class AdminCancelRebalanceDiskStmtTest {
+
+    private static Analyzer analyzer;
+
+    @Mocked
+    private PaloAuth auth;
+    @Mocked
+    private ConnectContext ctx;
+
+    @Before()
+    public void setUp() {
+        Config.disable_cluster_feature = false;
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+        MockedAuth.mockedAuth(auth);
+        MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
+
+        List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
+        beIds.forEach(id -> 
Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 
2048, 0)));
+    }
+
+    @Test
+    public void testParticularBackends() throws AnalysisException {
+        List<String> backends = Lists.newArrayList(
+            "192.168.0.10003:9051", "192.168.0.10004:9051", 
"192.168.0.10005:9051", "192.168.0.10006:9051");
+        final AdminCancelRebalanceDiskStmt stmt = new 
AdminCancelRebalanceDiskStmt(backends);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(2, stmt.getBackends().size());
+    }
+
+    @Test
+    public void testEmpty() throws AnalysisException {
+        List<String> backends = Lists.newArrayList();
+        final AdminCancelRebalanceDiskStmt stmt = new 
AdminCancelRebalanceDiskStmt(backends);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(0, stmt.getBackends().size());
+    }
+
+    @Test
+    public void testNull() throws AnalysisException {
+        final AdminCancelRebalanceDiskStmt stmt = new 
AdminCancelRebalanceDiskStmt(null);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(4, stmt.getBackends().size());
+    }
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
new file mode 100644
index 0000000..e83693c
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.clone.RebalancerTestUtil;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.mysql.privilege.MockedAuth;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+//import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Mocked;
+
+public class AdminRebalanceDiskStmtTest {
+
+    private static Analyzer analyzer;
+
+    @Mocked
+    private PaloAuth auth;
+    @Mocked
+    private ConnectContext ctx;
+
+    @Before()
+    public void setUp() {
+        Config.disable_cluster_feature = false;
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+        MockedAuth.mockedAuth(auth);
+        MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
+
+        List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
+        beIds.forEach(id -> 
Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 
2048, 0)));
+    }
+
+    @Test
+    public void testParticularBackends() throws AnalysisException {
+        List<String> backends = Lists.newArrayList(
+            "192.168.0.10003:9051", "192.168.0.10004:9051", 
"192.168.0.10005:9051", "192.168.0.10006:9051");
+        final AdminRebalanceDiskStmt stmt = new 
AdminRebalanceDiskStmt(backends);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(2, stmt.getBackends().size());
+    }
+
+    @Test
+    public void testEmpty() throws AnalysisException {
+        List<String> backends = Lists.newArrayList();
+        final AdminRebalanceDiskStmt stmt = new 
AdminRebalanceDiskStmt(backends);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(0, stmt.getBackends().size());
+    }
+
+    @Test
+    public void testNull() throws AnalysisException {
+        final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(null);
+        stmt.analyze(analyzer);
+        Assert.assertEquals(4, stmt.getBackends().size());
+    }
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
similarity index 54%
copy from fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
copy to fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
index fc4dd7d..1d7cb00 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
@@ -21,35 +21,32 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.RangePartitionInfo;
-import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.CloneTask;
-import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
-import org.apache.doris.thrift.TTabletInfo;
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
+import com.google.common.collect.Maps;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -61,7 +58,8 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
+//import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -69,10 +67,10 @@ import java.util.stream.LongStream;
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Mocked;
-import static com.google.common.collect.MoreCollectors.onlyElement;
+//import static com.google.common.collect.MoreCollectors.onlyElement;
 
-public class RebalanceTest {
-    private static final Logger LOG = 
LogManager.getLogger(RebalanceTest.class);
+public class DiskRebalanceTest {
+    private static final Logger LOG = 
LogManager.getLogger(DiskRebalanceTest.class);
 
     @Mocked
     private Catalog catalog;
@@ -134,9 +132,32 @@ public class RebalanceTest {
         // Test mock validation
         Assert.assertEquals(111, 
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
         
Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1,
 2, Lists.newArrayList(3L)));
+    }
+
+    private void generateStatisticMap() {
+        ClusterLoadStatistic loadStatistic = new 
ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
+                Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
+        loadStatistic.init();
+        statisticMap = HashBasedTable.create();
+        statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, 
Tag.DEFAULT_BACKEND_TAG, loadStatistic);
+    }
+
+    private void createPartitionsForTable(OlapTable olapTable, 
MaterializedIndex index, Long partitionCount) {
+        // partition id start from 31
+        LongStream.range(0, partitionCount).forEach(idx -> {
+            long id = 31 + idx;
+            Partition partition = new Partition(id, "p" + idx, index, new 
HashDistributionInfo());
+            olapTable.addPartition(partition);
+            olapTable.getPartitionInfo().addPartition(id, new 
DataProperty(TStorageMedium.HDD),
+                    ReplicaAllocation.DEFAULT_ALLOCATION, false);
+        });
+    }
 
-        List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
-        beIds.forEach(id -> 
systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
+    @Test
+    public void testDiskRebalancerWithSameUsageDisk() {
+        // init system
+        List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L);
+        beIds.forEach(id -> 
systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 
Lists.newArrayList(512L,512L), 2)));
 
         olapTable = new OlapTable(2, "fake table", new ArrayList<>(), 
KeysType.DUP_KEYS,
                 new RangePartitionInfo(), new HashDistributionInfo());
@@ -149,6 +170,7 @@ public class RebalanceTest {
                 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
 
         // Tablet distribution: we add them to olapTable & build invertedIndex 
manually
+        // all of tablets are in first path of it's backend
         RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", 
TStorageMedium.HDD,
                 50000, Lists.newArrayList(10001L, 10002L, 10003L));
 
@@ -157,57 +179,64 @@ public class RebalanceTest {
 
         RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", 
TStorageMedium.HDD,
                 70000, Lists.newArrayList(10001L, 10002L, 10003L));
+        
+        // case start
+        Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", 
Level.DEBUG);
 
-        // be4(10004) doesn't have any replica
-
+        Rebalancer rebalancer = new 
DiskRebalancer(Catalog.getCurrentSystemInfo(), 
Catalog.getCurrentInvertedIndex());
         generateStatisticMap();
+        rebalancer.updateLoadStatistic(statisticMap);
+        List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
+        // check alternativeTablets;
+        Assert.assertTrue(alternativeTablets.isEmpty());
     }
 
-    private void generateStatisticMap() {
-        ClusterLoadStatistic loadStatistic = new 
ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
-                Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
-        loadStatistic.init();
-        statisticMap = HashBasedTable.create();
-        statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, 
Tag.DEFAULT_BACKEND_TAG, loadStatistic);
-    }
+    @Test
+    public void testDiskRebalancerWithDiffUsageDisk() {
+        // init system
+        systemInfoService.addBackend(RebalancerTestUtil.createBackend(10001L, 
2048, Lists.newArrayList(1024L), 1));
+        systemInfoService.addBackend(RebalancerTestUtil.createBackend(10002L, 
2048, Lists.newArrayList(1024L, 512L), 2));
+        systemInfoService.addBackend(RebalancerTestUtil.createBackend(10003L, 
2048, Lists.newArrayList(1024L, 512L, 513L), 3));
 
-    private void createPartitionsForTable(OlapTable olapTable, 
MaterializedIndex index, Long partitionCount) {
-        // partition id start from 31
-        LongStream.range(0, partitionCount).forEach(idx -> {
-            long id = 31 + idx;
-            Partition partition = new Partition(id, "p" + idx, index, new 
HashDistributionInfo());
-            olapTable.addPartition(partition);
-            olapTable.getPartitionInfo().addPartition(id, new 
DataProperty(TStorageMedium.HDD),
-                    ReplicaAllocation.DEFAULT_ALLOCATION, false);
-        });
-    }
+        olapTable = new OlapTable(2, "fake table", new ArrayList<>(), 
KeysType.DUP_KEYS,
+                new RangePartitionInfo(), new HashDistributionInfo());
+        db.createTable(olapTable);
 
-    @Test
-    public void testPartitionRebalancer() {
-        Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", 
Level.DEBUG);
+        // 1 table, 3 partitions p0,p1,p2
+        MaterializedIndex materializedIndex = new 
MaterializedIndex(olapTable.getId(), null);
+        createPartitionsForTable(olapTable, materializedIndex, 3L);
+        olapTable.setIndexMeta(materializedIndex.getId(), "fake index", 
Lists.newArrayList(new Column()),
+                0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
 
-        // Disable scheduler's rebalancer adding balance task, add balance 
tasks manually
-        Config.disable_balance = true;
-        // generate statistic map again to create skewmap
-        Config.tablet_rebalancer_type = "partition";
-        generateStatisticMap();
-        // Create a new scheduler & checker for redundant tablets handling
-        // Call runAfterCatalogReady manually instead of starting daemon thread
-        TabletSchedulerStat stat = new TabletSchedulerStat();
-        PartitionRebalancer rebalancer = new 
PartitionRebalancer(Catalog.getCurrentSystemInfo(), 
Catalog.getCurrentInvertedIndex());
-        TabletScheduler tabletScheduler = new TabletScheduler(catalog, 
systemInfoService, invertedIndex, stat, "");
-        // The rebalancer inside the scheduler will use this rebalancer, for 
getToDeleteReplicaId
-        Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
+        // Tablet distribution: we add them to olapTable & build invertedIndex 
manually
+        // all of tablets are in first path of it's backend
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", 
TStorageMedium.HDD,
+                50000, Lists.newArrayList(10001L, 10002L, 10003L), 
Lists.newArrayList(0L, 100L, 300L));
+
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", 
TStorageMedium.HDD,
+                60000, Lists.newArrayList(10001L, 10002L, 10003L), 
Lists.newArrayList(50L, 0L, 200L));
+
+        RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", 
TStorageMedium.HDD,
+                70000, Lists.newArrayList(10001L, 10002L, 10003L), 
Lists.newArrayList(100L, 200L, 0L));
 
-        TabletChecker tabletChecker = new TabletChecker(catalog, 
systemInfoService, tabletScheduler, stat);
+        // case start
+        Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", 
Level.DEBUG);
 
+        Rebalancer rebalancer = new 
DiskRebalancer(Catalog.getCurrentSystemInfo(), 
Catalog.getCurrentInvertedIndex());
+        generateStatisticMap();
         rebalancer.updateLoadStatistic(statisticMap);
         List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
+        // check alternativeTablets;
+        Assert.assertEquals(2, alternativeTablets.size());
+        Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
+        for (Backend be : 
Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER))
 {
+            if (!backendsWorkingSlots.containsKey(be.getId())) {
+                List<Long> pathHashes = 
be.getDisks().values().stream().map(DiskInfo::getPathHash).collect(Collectors.toList());
+                PathSlot slot = new PathSlot(pathHashes, 
Config.schedule_slot_num_per_path);
+                backendsWorkingSlots.put(be.getId(), slot);
+            }
+        }
 
-        // Run once for update slots info, scheduler won't select balance cuz 
balance is disabled
-        tabletScheduler.runAfterCatalogReady();
-
-        AgentBatchTask batchTask = new AgentBatchTask();
         for (TabletSchedCtx tabletCtx : alternativeTablets) {
             LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
             try {
@@ -217,96 +246,17 @@ public class RebalanceTest {
                 
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
                 tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // 
rebalance tablet should be healthy first
 
-                // createCloneReplicaAndTask, create replica will change 
invertedIndex too.
-                rebalancer.createBalanceTask(tabletCtx, 
tabletScheduler.getBackendsWorkingSlots(), batchTask);
-            } catch (SchedException e) {
-                LOG.warn("schedule tablet {} failed: {}", 
tabletCtx.getTabletId(), e.getMessage());
-            }
-        }
-
-        // Show debug info of MoveInProgressMap detail
-        rebalancer.updateLoadStatistic(statisticMap);
-        rebalancer.selectAlternativeTablets();
-
-        // Get created tasks, and finish them manually
-        List<AgentTask> tasks = batchTask.getAllTasks();
-        List<Long> needCheckTablets = 
tasks.stream().map(AgentTask::getTabletId).collect(Collectors.toList());
-        LOG.info("created tasks for tablet: {}", needCheckTablets);
-        needCheckTablets.forEach(t -> Assert.assertEquals(4, 
invertedIndex.getReplicasByTabletId(t).size()));
-
-//        // If clone task execution is too slow, tabletChecker may want to 
delete the CLONE replica.
-//        tabletChecker.runAfterCatalogReady();
-//        Assert.assertTrue(tabletScheduler.containsTablet(50000));
-//        // tabletScheduler handle redundant
-//        tabletScheduler.runAfterCatalogReady();
-
-        for (Long tabletId : needCheckTablets) {
-            TabletSchedCtx tabletSchedCtx = 
alternativeTablets.stream().filter(ctx -> ctx.getTabletId() == 
tabletId).collect(onlyElement());
-            AgentTask task = tasks.stream().filter(t -> t.getTabletId() == 
tabletId).collect(onlyElement());
-
-            LOG.info("try to finish tabletCtx {}", tabletId);
-            try {
-                TFinishTaskRequest fakeReq = new TFinishTaskRequest();
-                fakeReq.task_status = new TStatus(TStatusCode.OK);
-                fakeReq.finish_tablet_infos = Lists.newArrayList(new 
TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
-                tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
+                AgentTask task = rebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+                if (tabletCtx.getTabletSize() == 0) {
+                    Assert.fail("no exception");
+                } else {
+                    Assert.assertTrue(task instanceof 
StorageMediaMigrationTask);
+                }
             } catch (SchedException e) {
-                e.printStackTrace();
+                LOG.info("schedule tablet {} failed: {}", 
tabletCtx.getTabletId(), e.getMessage());
             }
         }
-
-        // NeedCheckTablets are redundant, TabletChecker will add them to 
TabletScheduler
-        tabletChecker.runAfterCatalogReady();
-        needCheckTablets.forEach(t -> Assert.assertEquals(4, 
invertedIndex.getReplicasByTabletId(t).size()));
-        needCheckTablets.forEach(t -> 
Assert.assertTrue(tabletScheduler.containsTablet(t)));
-
-        // TabletScheduler handle redundant tablet
-        tabletScheduler.runAfterCatalogReady();
-
-        // One replica is set to DECOMMISSION, still 4 replicas
-        needCheckTablets.forEach(t -> {
-            List<Replica> replicas = invertedIndex.getReplicasByTabletId(t);
-            Assert.assertEquals(4, replicas.size());
-            Replica decommissionedReplica = replicas.stream().filter(r -> 
r.getState() == Replica.ReplicaState.DECOMMISSION).collect(onlyElement());
-            // expected watermarkTxnId is 111
-            Assert.assertEquals(111, 
decommissionedReplica.getWatermarkTxnId());
-        });
-
-        // Delete replica should change invertedIndex too
-        tabletScheduler.runAfterCatalogReady();
-        needCheckTablets.forEach(t -> Assert.assertEquals(3, 
invertedIndex.getReplicasByTabletId(t).size()));
-
-        // Check moves completed
-        rebalancer.selectAlternativeTablets();
-        rebalancer.updateLoadStatistic(statisticMap);
-        AtomicLong succeeded = Deencapsulation.getField(rebalancer, 
"counterBalanceMoveSucceeded");
-        Assert.assertEquals(needCheckTablets.size(), succeeded.get());
     }
 
-    @Test
-    public void testMoveInProgressMap() {
-        Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache", 
Level.DEBUG);
-        MovesCacheMap m = new MovesCacheMap();
-        m.updateMapping(statisticMap, 3);
-        m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, 
TStorageMedium.HDD).get().put(1L, new Pair<>(null, -1L));
-        m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, 
TStorageMedium.SSD).get().put(2L, new Pair<>(null, -1L));
-        m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, 
TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
-        // Maintenance won't clean up the entries of cache
-        m.maintain();
-        Assert.assertEquals(3, m.size());
-
-        // Reset the expireAfterAccess, the whole cache map will be cleared.
-        m.updateMapping(statisticMap, 1);
-        Assert.assertEquals(0, m.size());
-
-        m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, 
TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
-        try {
-            Thread.sleep(1000);
-            m.maintain();
-            Assert.assertEquals(0, m.size());
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
 }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index fc4dd7d..e94fa86 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
@@ -183,6 +184,31 @@ public class RebalanceTest {
     }
 
     @Test
+    public void testPrioBackends() {
+        Rebalancer rebalancer = new 
DiskRebalancer(Catalog.getCurrentSystemInfo(), 
Catalog.getCurrentInvertedIndex());
+        // add
+        {
+            List<Backend> backends = Lists.newArrayList();
+            for (int i = 0; i < 3; i++) {
+                backends.add(RebalancerTestUtil.createBackend(10086 + i, 2048, 
0));
+            }
+            rebalancer.addPrioBackends(backends, 1000);
+            Assert.assertTrue(rebalancer.hasPrioBackends());
+        }
+
+        // remove
+        for (int i = 0; i < 3; i++) {
+            List<Backend> backends = 
Lists.newArrayList(RebalancerTestUtil.createBackend(10086 + i, 2048, 0));
+            rebalancer.removePrioBackends(backends);
+            if (i == 2) {
+                Assert.assertFalse(rebalancer.hasPrioBackends());
+            } else {
+                Assert.assertTrue(rebalancer.hasPrioBackends());
+            }
+        }
+    }
+
+    @Test
     public void testPartitionRebalancer() {
         Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", 
Level.DEBUG);
 
@@ -218,7 +244,8 @@ public class RebalanceTest {
                 tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // 
rebalance tablet should be healthy first
 
                 // createCloneReplicaAndTask, create replica will change 
invertedIndex too.
-                rebalancer.createBalanceTask(tabletCtx, 
tabletScheduler.getBackendsWorkingSlots(), batchTask);
+                AgentTask task = rebalancer.createBalanceTask(tabletCtx, 
tabletScheduler.getBackendsWorkingSlots());
+                batchTask.addTask(task);
             } catch (SchedException e) {
                 LOG.warn("schedule tablet {} failed: {}", 
tabletCtx.getTabletId(), e.getMessage());
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 4b3e4c6..5e02a51 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -40,14 +41,20 @@ public class RebalancerTestUtil {
 
     // Add only one path, PathHash:id
     public static Backend createBackend(long id, long totalCap, long usedCap) {
+        return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1);
+    }
+    // size of usedCaps should equal to diskNum
+    public static Backend createBackend(long id, long totalCap, List<Long> 
usedCaps, int diskNum) {
         // ip:port won't be checked
         Backend be = new Backend(id, "192.168.0." + id, 9051);
         Map<String, DiskInfo> disks = Maps.newHashMap();
-        DiskInfo diskInfo = new DiskInfo("/path1");
-        diskInfo.setPathHash(id);
-        diskInfo.setTotalCapacityB(totalCap);
-        diskInfo.setDataUsedCapacityB(usedCap);
-        disks.put(diskInfo.getRootPath(), diskInfo);
+        for (int i = 0; i < diskNum; i++) {
+            DiskInfo diskInfo = new DiskInfo("/path" + (i + 1));
+            diskInfo.setPathHash(id + i);
+            diskInfo.setTotalCapacityB(totalCap);
+            diskInfo.setDataUsedCapacityB(usedCaps.get(i));
+            disks.put(diskInfo.getRootPath(), diskInfo);
+        }
         be.setDisks(ImmutableMap.copyOf(disks));
         be.setAlive(true);
         be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
@@ -59,28 +66,37 @@ public class RebalancerTestUtil {
     // Only use the partition's baseIndex for simplicity
     public static void createTablet(TabletInvertedIndex invertedIndex, 
Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
                                     int tabletId, List<Long> beIds) {
+        createTablet(invertedIndex, db, olapTable, partitionName, medium, 
tabletId, beIds, null);
+    }
+    public static void createTablet(TabletInvertedIndex invertedIndex, 
Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
+                                    int tabletId, List<Long> beIds, List<Long> 
replicaSizes) {
         Partition partition = olapTable.getPartition(partitionName);
         MaterializedIndex baseIndex = partition.getBaseIndex();
         int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
 
         TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), 
partition.getId(), baseIndex.getId(),
-                schemaHash, medium);
+        schemaHash, medium);
         Tablet tablet = new Tablet(tabletId);
 
         // add tablet to olapTable
         olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, 
tabletMeta);
-        createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds);
+        createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds, 
replicaSizes);
     }
 
     // Create replicas on backends which are numbered in beIds.
     // The tablet & replicas will be added to invertedIndex.
-    public static void createReplicasAndAddToIndex(TabletInvertedIndex 
invertedIndex, TabletMeta tabletMeta, Tablet tablet, List<Long> beIds) {
+    public static void createReplicasAndAddToIndex(TabletInvertedIndex 
invertedIndex, TabletMeta tabletMeta,
+                                                Tablet tablet, List<Long> 
beIds, List<Long> replicaSizes) {
         invertedIndex.addTablet(tablet.getId(), tabletMeta);
 
         IntStream.range(0, beIds.size()).forEach(i -> {
             Replica replica = new Replica(tablet.getId() + i, beIds.get(i), 
Replica.ReplicaState.NORMAL, 1, tabletMeta.getOldSchemaHash());
             // We've set pathHash to beId for simplicity
             replica.setPathHash(beIds.get(i));
+            if (replicaSizes != null) {
+                // for disk rebalancer, every beId corresponding to a 
replicaSize
+                replica.updateStat(replicaSizes.get(i), 0);
+            }
             // isRestore set true, to avoid modifying 
Catalog.getCurrentInvertedIndex
             tablet.addReplica(replica, true);
             invertedIndex.addReplica(tablet.getId(), replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 375c86f..c1e7d42 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -89,6 +89,7 @@ public class AgentTaskTest {
     private AgentTask rollupTask;
     private AgentTask schemaChangeTask;
     private AgentTask cancelDeleteTask;
+    private AgentTask storageMediaMigrationTask;
 
     @Before
     public void setUp() throws AnalysisException {
@@ -140,6 +141,11 @@ public class AgentTaskTest {
                 new SchemaChangeTask(null, backendId1, dbId, tableId, 
partitionId, indexId1, 
                                      tabletId1, replicaId1, columns, 
schemaHash2, schemaHash1, 
                                      shortKeyNum, storageType, null, 0, 
TKeysType.AGG_KEYS);
+
+        // storageMediaMigrationTask
+        storageMediaMigrationTask =
+                new StorageMediaMigrationTask(backendId1, tabletId1, 
schemaHash1, TStorageMedium.HDD);
+        ((StorageMediaMigrationTask) 
storageMediaMigrationTask).setDataDir("/home/a");
     }
 
     @Test
@@ -211,6 +217,15 @@ public class AgentTaskTest {
         Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType());
         Assert.assertEquals(schemaChangeTask.getSignature(), 
request6.getSignature());
         Assert.assertNotNull(request6.getAlterTabletReq());
+
+        // storageMediaMigrationTask
+        TAgentTaskRequest request7 =
+            (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, 
storageMediaMigrationTask);
+        Assert.assertEquals(TTaskType.STORAGE_MEDIUM_MIGRATE, 
request7.getTaskType());
+        Assert.assertEquals(storageMediaMigrationTask.getSignature(), 
request7.getSignature());
+        Assert.assertNotNull(request7.getStorageMediumMigrateReq());
+        
Assert.assertTrue(request7.getStorageMediumMigrateReq().isSetDataDir());
+        
Assert.assertEquals(request7.getStorageMediumMigrateReq().getDataDir(), 
"/home/a");
     }
 
     @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to