github-actions[bot] commented on code in PR #27555:
URL: https://github.com/apache/doris/pull/27555#discussion_r1410538136


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -324,1709 +145,1552 @@ void TaskWorkerPool::_finish_task(const 
TFinishTaskRequest& finish_task_request)
                     .error(result.status);
             try_time += 1;
         }
-        sleep(config::sleep_one_second);
+        sleep(1);
     }
 }
 
-void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id,
+                       const TSchemaHash schema_hash, TTabletInfo* 
tablet_info) {
+    tablet_info->__set_tablet_id(tablet_id);
+    tablet_info->__set_schema_hash(schema_hash);
+    return engine.tablet_manager()->report_tablet_info(tablet_info);
+}
 
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
+void random_sleep(int second) {
+    Random rnd(UnixMillis());
+    sleep(rnd.Uniform(second) + 1);
+}
 
-        auto& alter_inverted_index_rq = 
agent_task_req.alter_inverted_index_req;
-        LOG(INFO) << "get alter inverted index task. signature=" << 
agent_task_req.signature
-                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
-                  << ", job_id=" << alter_inverted_index_rq.job_id;
+void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& 
agent_task_req, int64_t signature,
+                  const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request) {
+    Status status;
 
-        Status status = Status::OK();
-        TabletSharedPtr tablet_ptr = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                alter_inverted_index_rq.tablet_id);
-        if (tablet_ptr != nullptr) {
-            EngineIndexChangeTask engine_task(alter_inverted_index_rq);
-            status = StorageEngine::instance()->execute_task(&engine_task);
-        } else {
-            status =
-                    Status::NotFound("could not find tablet {}", 
alter_inverted_index_rq.tablet_id);
-        }
+    std::string_view process_name;
+    switch (task_type) {
+    case TTaskType::ALTER:
+        process_name = "alter tablet";
+        break;
+    default:
+        std::string task_name;
+        EnumToString(TTaskType, task_type, task_name);
+        LOG(WARNING) << "schema change type invalid. type: " << task_name
+                     << ", signature: " << signature;
+        status = Status::NotSupported("Schema change type invalid");
+        break;
+    }
 
-        // Return result to fe
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        std::vector<TTabletInfo> finish_tablet_infos;
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to alter inverted index task, signature="
-                         << agent_task_req.signature
-                         << ", tablet_id=" << alter_inverted_index_rq.tablet_id
-                         << ", job_id=" << alter_inverted_index_rq.job_id << 
", error=" << status;
-        } else {
-            LOG(INFO) << "successfully alter inverted index task, signature="
-                      << agent_task_req.signature
-                      << ", tablet_id=" << alter_inverted_index_rq.tablet_id
-                      << ", job_id=" << alter_inverted_index_rq.job_id;
-            TTabletInfo tablet_info;
-            status = _get_tablet_info(alter_inverted_index_rq.tablet_id,
-                                      alter_inverted_index_rq.schema_hash, 
agent_task_req.signature,
-                                      &tablet_info);
-            if (status.ok()) {
-                finish_tablet_infos.push_back(tablet_info);
-            }
-            finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
-        }
-        finish_task_request.__set_task_status(status.to_thrift());
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    // Check last schema change status, if failed delete tablet file
+    // Do not need to adjust delete success or not
+    // Because if delete failed create rollup will failed
+    TTabletId new_tablet_id = 0;
+    TSchemaHash new_schema_hash = 0;
+    if (status.ok()) {
+        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
+        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
+        EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
+        status = engine_task.execute();
     }
-}
 
-void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get update tablet meta task. signature=" << 
agent_task_req.signature;
-
-        Status status;
-        auto& update_tablet_meta_req = 
agent_task_req.update_tablet_meta_info_req;
-        for (auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
-            TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                    tablet_meta_info.tablet_id);
-            if (tablet == nullptr) {
-                status = Status::NotFound("tablet not found");
-                LOG(WARNING) << "could not find tablet when update tablet 
meta. tablet_id="
-                             << tablet_meta_info.tablet_id;
-                continue;
-            }
-            bool need_to_save = false;
-            if (tablet_meta_info.__isset.storage_policy_id) {
-                
tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
-                need_to_save = true;
-            }
-            if (tablet_meta_info.__isset.is_in_memory) {
-                
tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
-                        tablet_meta_info.is_in_memory);
-                std::shared_lock rlock(tablet->get_header_lock());
-                for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
-                    
rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
-                }
-                
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
-                need_to_save = true;
-            }
-            if (tablet_meta_info.__isset.compaction_policy) {
-                if (tablet_meta_info.compaction_policy != "size_based" &&
-                    tablet_meta_info.compaction_policy != "time_series") {
-                    status = Status::InvalidArgument(
-                            "invalid compaction policy, only support for 
size_based or "
-                            "time_series");
-                    continue;
-                }
-                
tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
-                need_to_save = true;
-            }
-            if 
(tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
-                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
-                    status = Status::InvalidArgument(
-                            "only time series compaction policy support time 
series config");
-                    continue;
-                }
-                
tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
-                        
tablet_meta_info.time_series_compaction_goal_size_mbytes);
-                need_to_save = true;
-            }
-            if 
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
-                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
-                    status = Status::InvalidArgument(
-                            "only time series compaction policy support time 
series config");
-                    continue;
-                }
-                
tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
-                        
tablet_meta_info.time_series_compaction_file_count_threshold);
-                need_to_save = true;
-            }
-            if 
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
-                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
-                    status = Status::InvalidArgument(
-                            "only time series compaction policy support time 
series config");
-                    continue;
-                }
-                
tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
-                        
tablet_meta_info.time_series_compaction_time_threshold_seconds);
-                need_to_save = true;
-            }
-            if (tablet_meta_info.__isset.replica_id) {
-                
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
-            }
-            if (tablet_meta_info.__isset.binlog_config) {
-                // check binlog_config require fields: enable, ttl_seconds, 
max_bytes, max_history_nums
-                auto& t_binlog_config = tablet_meta_info.binlog_config;
-                if (!t_binlog_config.__isset.enable || 
!t_binlog_config.__isset.ttl_seconds ||
-                    !t_binlog_config.__isset.max_bytes ||
-                    !t_binlog_config.__isset.max_history_nums) {
-                    status = Status::InvalidArgument("invalid binlog config, 
some fields not set");
-                    LOG(WARNING) << fmt::format(
-                            "invalid binlog config, some fields not set, 
tablet_id={}, "
-                            "t_binlog_config={}",
-                            tablet_meta_info.tablet_id,
-                            
apache::thrift::ThriftDebugString(t_binlog_config));
-                    continue;
-                }
+    if (status.ok()) {
+        s_report_version.fetch_add(1, std::memory_order_relaxed);
+    }
 
-                BinlogConfig new_binlog_config;
-                new_binlog_config = tablet_meta_info.binlog_config;
-                LOG(INFO) << fmt::format(
-                        "update tablet meta binlog config. tablet_id={}, 
old_binlog_config={}, "
-                        "new_binlog_config={}",
-                        tablet_meta_info.tablet_id,
-                        tablet->tablet_meta()->binlog_config().to_string(),
-                        new_binlog_config.to_string());
-                tablet->set_binlog_config(new_binlog_config);
-                need_to_save = true;
-            }
-            if (tablet_meta_info.__isset.enable_single_replica_compaction) {
-                std::shared_lock rlock(tablet->get_header_lock());
-                tablet->tablet_meta()
-                        ->mutable_tablet_schema()
-                        ->set_enable_single_replica_compaction(
-                                
tablet_meta_info.enable_single_replica_compaction);
-                for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
-                    
rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
-                            tablet_meta_info.enable_single_replica_compaction);
-                }
-                
tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
-                        tablet_meta_info.enable_single_replica_compaction);
-                need_to_save = true;
-            }
+    // Return result to fe
+    finish_task_request->__set_backend(BackendOptions::get_local_backend());
+    finish_task_request->__set_report_version(s_report_version);
+    finish_task_request->__set_task_type(task_type);
+    finish_task_request->__set_signature(signature);
 
-            if (tablet_meta_info.__isset.skip_write_index_on_load) {
-                std::shared_lock rlock(tablet->get_header_lock());
-                
tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
-                        tablet_meta_info.skip_write_index_on_load);
-                for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
-                    rowset_meta->tablet_schema()->set_skip_write_index_on_load(
-                            tablet_meta_info.skip_write_index_on_load);
-                }
-                tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
-                        tablet_meta_info.skip_write_index_on_load);
-                need_to_save = true;
-            }
-            if (need_to_save) {
-                std::shared_lock rlock(tablet->get_header_lock());
-                tablet->save_meta();
-            }
+    std::vector<TTabletInfo> finish_tablet_infos;
+    if (status.ok()) {
+        TTabletInfo tablet_info;
+        status = get_tablet_info(engine, new_tablet_id, new_schema_hash, 
&tablet_info);
+        if (status.ok()) {
+            finish_tablet_infos.push_back(tablet_info);
         }
+    }
 
-        LOG(INFO) << "finish update tablet meta task. signature=" << 
agent_task_req.signature;
-        if (agent_task_req.signature != -1) {
-            TFinishTaskRequest finish_task_request;
-            finish_task_request.__set_task_status(status.to_thrift());
-            
finish_task_request.__set_backend(BackendOptions::get_local_backend());
-            finish_task_request.__set_task_type(agent_task_req.task_type);
-            finish_task_request.__set_signature(agent_task_req.signature);
-            _finish_task(finish_task_request);
-            _remove_task_info(agent_task_req.task_type, 
agent_task_req.signature);
-        }
+    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
+        LOG_WARNING("failed to {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id)
+                .error(status);
+    } else {
+        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
+        LOG_INFO("successfully {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id);
     }
+    finish_task_request->__set_task_status(status.to_thrift());
 }
 
-void TaskWorkerPool::_check_consistency_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TCheckConsistencyReq check_consistency_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+Status check_migrate_request(StorageEngine& engine, const 
TStorageMediumMigrateReq& req,
+                             TabletSharedPtr& tablet, DataDir** dest_store) {
+    int64_t tablet_id = req.tablet_id;
+    tablet = engine.tablet_manager()->get_tablet(tablet_id);
+    if (tablet == nullptr) {
+        return Status::InternalError("could not find tablet {}", tablet_id);
+    }
 
-            agent_task_req = _tasks.front();
-            check_consistency_req = agent_task_req.check_consistency_req;
-            _tasks.pop_front();
+    if (req.__isset.data_dir) {
+        // request specify the data dir
+        *dest_store = engine.get_store(req.data_dir);
+        if (*dest_store == nullptr) {
+            return Status::InternalError("could not find data dir {}", 
req.data_dir);
         }
+    } else {
+        // this is a storage medium
+        // get data dir by storage medium
 
-        uint32_t checksum = 0;
-        EngineChecksumTask engine_task(check_consistency_req.tablet_id,
-                                       check_consistency_req.schema_hash,
-                                       check_consistency_req.version, 
&checksum);
-        Status status = StorageEngine::instance()->execute_task(&engine_task);
-        if (!status.ok()) {
-            LOG_WARNING("failed to check consistency")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", check_consistency_req.tablet_id)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully check consistency")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", check_consistency_req.tablet_id)
-                    .tag("checksum", checksum);
+        // judge case when no need to migrate
+        uint32_t count = engine.available_storage_medium_type_count();
+        if (count <= 1) {
+            return Status::InternalError("available storage medium type count 
is less than 1");
         }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-        
finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
-        
finish_task_request.__set_request_version(check_consistency_req.version);
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_report_task_worker_thread_callback() {
-    StorageEngine::instance()->register_report_listener(this);
-    TReportRequest request;
-    while (_is_work) {
-        _is_doing_work = false;
-        {
-            // wait at most report_task_interval_seconds, or being notified
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait_for(
-                    worker_thread_lock, 
std::chrono::seconds(config::report_task_interval_seconds));
+        // check current tablet storage medium
+        TStorageMedium::type storage_medium = req.storage_medium;
+        TStorageMedium::type src_storage_medium = 
tablet->data_dir()->storage_medium();
+        if (src_storage_medium == storage_medium) {
+            return Status::InternalError("tablet is already on specified 
storage medium {}",
+                                         storage_medium);
         }
-        if (!_is_work) {
-            break;
+        // get a random store of specified storage medium
+        auto stores = engine.get_stores_for_create_tablet(storage_medium);
+        if (stores.empty()) {
+            return Status::InternalError("failed to get root path for create 
tablet");
         }
 
-        if (_master_info.network_address.port == 0) {
-            // port == 0 means not received heartbeat yet
-            // sleep a short time and try again
-            LOG(INFO)
-                    << "waiting to receive first heartbeat from frontend 
before doing task report";
-            continue;
-        }
+        *dest_store = stores[0];
+    }
+    if (tablet->data_dir()->path() == (*dest_store)->path()) {
+        LOG_WARNING("tablet is already on specified path").tag("path", 
tablet->data_dir()->path());
+        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on 
specified path: {}",
+                                                        
tablet->data_dir()->path());
+    }
 
-        _is_doing_work = true;
-        // See _random_sleep() comment in 
_report_disk_state_worker_thread_callback
-        _random_sleep(5);
-        {
-            std::lock_guard<std::mutex> 
task_signatures_lock(_s_task_signatures_lock);
-            request.__set_tasks(_s_task_signatures);
-            request.__set_backend(BackendOptions::get_local_backend());
-        }
-        _handle_report(request, ReportType::TASK);
+    // check local disk capacity
+    int64_t tablet_size = tablet->tablet_local_size();
+    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
+        return Status::InternalError("reach the capacity limit of path {}, 
tablet_size={}",
+                                     (*dest_store)->path(), tablet_size);
     }
-    StorageEngine::instance()->deregister_report_listener(this);
+    return Status::OK();
 }
 
-/// disk state report thread will report disk state at a configurable fix 
interval.
-void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
-    StorageEngine::instance()->register_report_listener(this);
-
-    while (_is_work) {
-        _is_doing_work = false;
-        {
-            // wait at most report_disk_state_interval_seconds, or being 
notified
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait_for(
-                    worker_thread_lock,
-                    
std::chrono::seconds(config::report_disk_state_interval_seconds));
-        }
-        if (!_is_work) {
-            break;
-        }
-
-        if (_master_info.network_address.port == 0) {
-            // port == 0 means not received heartbeat yet
-            LOG(INFO)
-                    << "waiting to receive first heartbeat from frontend 
before doing disk report";
-            continue;
-        }
+// Return `true` if report success
+bool handle_report(const TReportRequest& request, const TMasterInfo& 
master_info,
+                   std::string_view name) {
+    TMasterResult result;
+    Status status = MasterServerClient::instance()->report(request, &result);
+    if (!status.ok()) [[unlikely]] {
+        LOG_WARNING("failed to report {}", name)
+                .tag("host", master_info.network_address.hostname)
+                .tag("port", master_info.network_address.port)
+                .error(status);
+        return false;
+    }
 
-        _is_doing_work = true;
-        // Random sleep 1~5 seconds before doing report.
-        // In order to avoid the problem that the FE receives many report 
requests at the same time
-        // and can not be processed.
-        _random_sleep(5);
-
-        TReportRequest request;
-        request.__set_backend(BackendOptions::get_local_backend());
-        request.__isset.disks = true;
-
-        std::vector<DataDirInfo> data_dir_infos;
-        
static_cast<void>(StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos,
-                                                                           
true /* update */));
-
-        for (auto& root_path_info : data_dir_infos) {
-            TDisk disk;
-            disk.__set_root_path(root_path_info.path);
-            disk.__set_path_hash(root_path_info.path_hash);
-            disk.__set_storage_medium(root_path_info.storage_medium);
-            disk.__set_disk_total_capacity(root_path_info.disk_capacity);
-            disk.__set_data_used_capacity(root_path_info.local_used_capacity);
-            
disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
-            disk.__set_disk_available_capacity(root_path_info.available);
-            disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
-            disk.__set_used(root_path_info.is_used);
-            request.disks[root_path_info.path] = disk;
-        }
-        request.__set_num_cores(CpuInfo::num_cores());
-        request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
-                                                     ? 
config::pipeline_executor_size
-                                                     : CpuInfo::num_cores());
-        _handle_report(request, ReportType::DISK);
+    else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
+        LOG_WARNING("failed to report {}", name)
+                .tag("host", master_info.network_address.hostname)
+                .tag("port", master_info.network_address.port)
+                .error(result.status);
+        return false;
     }
-    StorageEngine::instance()->deregister_report_listener(this);
+
+    return true;
 }
 
-void TaskWorkerPool::_report_tablet_worker_thread_callback() {
-    StorageEngine::instance()->register_report_listener(this);
+void _submit_task(const TAgentTaskRequest& task,
+                  std::function<Status(const TAgentTaskRequest&)> submit_op) {
+    const TTaskType::type task_type = task.task_type;
+    int64_t signature = task.signature;
 
-    while (_is_work) {
-        _is_doing_work = false;
+    std::string type_str;
+    EnumToString(TTaskType, task_type, type_str);
+    VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << 
signature;
 
-        {
-            // wait at most report_tablet_interval_seconds, or being notified
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait_for(
-                    worker_thread_lock,
-                    
std::chrono::seconds(config::report_tablet_interval_seconds));
-        }
-        if (!_is_work) {
-            break;
-        }
+    if (!register_task_info(task_type, signature)) {
+        LOG_WARNING("failed to register task").tag("type", 
type_str).tag("signature", signature);
+        return;
+    }
 
-        if (_master_info.network_address.port == 0) {
-            // port == 0 means not received heartbeat yet
-            LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing tablet "
-                         "report";
-            continue;
-        }
+    // Set the receiving time of task so that we can determine whether it is 
timed out later
+    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
+    auto st = submit_op(task);
+    if (!st.ok()) [[unlikely]] {
+        LOG_INFO("failed to submit task").tag("type", 
type_str).tag("signature", signature);
+        return;
+    }
 
-        _is_doing_work = true;
-        // See _random_sleep() comment in 
_report_disk_state_worker_thread_callback
-        _random_sleep(5);
-
-        TReportRequest request;
-        request.__set_backend(BackendOptions::get_local_backend());
-        request.__isset.tablets = true;
-
-        uint64_t report_version = _s_report_version;
-        static_cast<void>(
-                
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
-                        &request.tablets));
-        if (report_version < _s_report_version) {
-            // TODO llj This can only reduce the possibility for report error, 
but can't avoid it.
-            // If FE create a tablet in FE meta and send CREATE task to this 
BE, the tablet may not be included in this
-            // report, and the report version has a small probability that it 
has not been updated in time. When FE
-            // receives this report, it is possible to delete the new tablet.
-            LOG(WARNING) << "report version " << report_version << " change to 
"
-                         << _s_report_version;
-            
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
-            continue;
-        }
-        int64_t max_compaction_score =
-                
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
-                         
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
-        request.__set_tablet_max_compaction_score(max_compaction_score);
-        request.__set_report_version(report_version);
-
-        // report storage policy and resource
-        auto& storage_policy_list = request.storage_policy;
-        for (auto [id, version] : get_storage_policy_ids()) {
-            auto& storage_policy = storage_policy_list.emplace_back();
-            storage_policy.__set_id(id);
-            storage_policy.__set_version(version);
-        }
-        request.__isset.storage_policy = true;
-        auto& resource_list = request.resource;
-        for (auto [id, version] : get_storage_resource_ids()) {
-            auto& resource = resource_list.emplace_back();
-            resource.__set_id(id);
-            resource.__set_version(version);
-        }
-        request.__isset.resource = true;
+    LOG_INFO("successfully submit task").tag("type", 
type_str).tag("signature", signature);
+}
 
-        _handle_report(request, ReportType::TABLET);
+bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
+
+bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", 
"ALTER_INVERTED_INDEX");
+bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY");
+bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD");
+bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD");
+bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT");
+bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT");
+bvar::Adder<uint64_t> MOVE_count("task", "MOVE");
+bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION");
+bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY");
+bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF");
+bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE");
+bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE");
+bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION");
+bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", 
"CLEAR_TRANSACTION_TASK");
+bvar::Adder<uint64_t> DELETE_count("task", "DELETE");
+bvar::Adder<uint64_t> PUSH_count("task", "PUSH");
+bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", 
"UPDATE_TABLET_META_INFO");
+bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
+bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
+bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", 
"STORAGE_MEDIUM_MIGRATE");
+bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
+
+void add_task_count(const TAgentTaskRequest& task, int n) {
+    // clang-format off
+    switch (task.task_type) {
+    #define ADD_TASK_COUNT(type) \
+    case TTaskType::type:        \
+        type##_count << n;       \
+        return;
+    ADD_TASK_COUNT(ALTER_INVERTED_INDEX)
+    ADD_TASK_COUNT(CHECK_CONSISTENCY)
+    ADD_TASK_COUNT(UPLOAD)
+    ADD_TASK_COUNT(DOWNLOAD)
+    ADD_TASK_COUNT(MAKE_SNAPSHOT)
+    ADD_TASK_COUNT(RELEASE_SNAPSHOT)
+    ADD_TASK_COUNT(MOVE)
+    ADD_TASK_COUNT(COMPACTION)
+    ADD_TASK_COUNT(PUSH_STORAGE_POLICY)
+    ADD_TASK_COUNT(PUSH_COOLDOWN_CONF)
+    ADD_TASK_COUNT(CREATE)
+    ADD_TASK_COUNT(DROP)
+    ADD_TASK_COUNT(PUBLISH_VERSION)
+    ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
+    ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
+    ADD_TASK_COUNT(ALTER)
+    ADD_TASK_COUNT(CLONE)
+    ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
+    ADD_TASK_COUNT(GC_BINLOG)
+    #undef ADD_TASK_COUNT
+    case TTaskType::REALTIME_PUSH:
+    case TTaskType::PUSH:
+        if (task.push_req.push_type == TPushType::LOAD_V2) {
+            PUSH_count << n;
+        } else if (task.push_req.push_type == TPushType::DELETE) {
+            DELETE_count << n;
+        }
+        return;
+    default:
+        return;
     }
-    StorageEngine::instance()->deregister_report_listener(this);
+    // clang-format on
 }
 
-void TaskWorkerPool::_upload_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TUploadReq upload_request;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            upload_request = agent_task_req.upload_req;
-            _tasks.pop_front();
-        }
+bvar::Adder<uint64_t> report_task_total("report", "task_total");
+bvar::Adder<uint64_t> report_task_failed("report", "task_failed");
+bvar::Adder<uint64_t> report_disk_total("report", "disk_total");
+bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed");
+bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total");
+bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
+
+} // namespace
+
+TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count,
+                               std::function<void(const TAgentTaskRequest& 
task)> callback)
+        : _callback(std::move(callback)) {
+    auto st = ThreadPoolBuilder(fmt::format("TaskWorkerPool.{}", name))
+                      .set_min_threads(worker_count)
+                      .set_max_threads(worker_count)
+                      .build(&_thread_pool);
+    CHECK(st.ok()) << name << ": " << st;
+}
 
-        LOG(INFO) << "get upload task. signature=" << agent_task_req.signature
-                  << ", job_id=" << upload_request.job_id;
+TaskWorkerPool::~TaskWorkerPool() {
+    stop();
+}
 
-        std::map<int64_t, std::vector<std::string>> tablet_files;
-        std::unique_ptr<SnapshotLoader> loader = 
std::make_unique<SnapshotLoader>(
-                _env, upload_request.job_id, agent_task_req.signature, 
upload_request.broker_addr,
-                upload_request.broker_prop);
-        Status status = loader->init(
-                upload_request.__isset.storage_backend ? 
upload_request.storage_backend
-                                                       : 
TStorageBackendType::type::BROKER,
-                upload_request.__isset.location ? upload_request.location : 
"");
-        if (status.ok()) {
-            status = loader->upload(upload_request.src_dest_map, 
&tablet_files);
-        }
+void TaskWorkerPool::stop() {
+    if (_stopped.exchange(true)) {
+        return;
+    }
 
-        if (!status.ok()) {
-            LOG_WARNING("failed to upload")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", upload_request.job_id)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully upload")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", upload_request.job_id);
-        }
+    if (_thread_pool) {
+        _thread_pool->shutdown();
+    }
+}
 
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_tablet_files(tablet_files);
+void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+    _submit_task(task, [this](auto&& task) {
+        add_task_count(task, 1);
+        return _thread_pool->submit_func([this, task]() {
+            _callback(task);
+            add_task_count(task, -1);
+        });
+    });
+}
 
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
+PriorTaskWorkerPool::PriorTaskWorkerPool(
+        std::string_view name, int normal_worker_count, int 
high_prior_worker_conut,
+        std::function<void(const TAgentTaskRequest& task)> callback)
+        : _callback(std::move(callback)) {
+    auto st = ThreadPoolBuilder(fmt::format("TaskWorkerPool.{}", name))
+                      .set_min_threads(normal_worker_count)
+                      .set_max_threads(normal_worker_count)
+                      .build(&_normal_pool);
+    CHECK(st.ok()) << name << ": " << st;
+
+    st = _normal_pool->submit_func([this] { normal_loop(); });
+    CHECK(st.ok()) << name << ": " << st;
+
+    st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
+                 .set_min_threads(high_prior_worker_conut)
+                 .set_max_threads(high_prior_worker_conut)
+                 .build(&_high_prior_pool);
+    CHECK(st.ok()) << name << ": " << st;
+
+    st = _high_prior_pool->submit_func([this] { high_prior_loop(); });
+    CHECK(st.ok()) << name << ": " << st;
 }
 
-void TaskWorkerPool::_download_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TDownloadReq download_request;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+PriorTaskWorkerPool::~PriorTaskWorkerPool() {
+    stop();
+}
 
-            agent_task_req = _tasks.front();
-            download_request = agent_task_req.download_req;
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get download task. signature=" << 
agent_task_req.signature
-                  << ", job_id=" << download_request.job_id
-                  << ", task detail: " << 
apache::thrift::ThriftDebugString(download_request);
-
-        // TODO: download
-        std::vector<int64_t> downloaded_tablet_ids;
-
-        auto status = Status::OK();
-        if (download_request.__isset.remote_tablet_snapshots) {
-            std::unique_ptr<SnapshotLoader> loader = 
std::make_unique<SnapshotLoader>(
-                    _env, download_request.job_id, agent_task_req.signature);
-            status = 
loader->remote_http_download(download_request.remote_tablet_snapshots,
-                                                  &downloaded_tablet_ids);
-        } else {
-            std::unique_ptr<SnapshotLoader> loader = 
std::make_unique<SnapshotLoader>(
-                    _env, download_request.job_id, agent_task_req.signature,
-                    download_request.broker_addr, 
download_request.broker_prop);
-            status = loader->init(
-                    download_request.__isset.storage_backend ? 
download_request.storage_backend
-                                                             : 
TStorageBackendType::type::BROKER,
-                    download_request.__isset.location ? 
download_request.location : "");
-            if (status.ok()) {
-                status = loader->download(download_request.src_dest_map, 
&downloaded_tablet_ids);
-            }
+void PriorTaskWorkerPool::stop() {
+    {
+        std::lock_guard lock(_mtx);
+        if (_stopped) {
+            return;
         }
 
-        if (!status.ok()) {
-            LOG_WARNING("failed to download")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", download_request.job_id)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully download")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", download_request.job_id);
-        }
+        _stopped = true;
+    }
+    _normal_condv.notify_all();
+    _high_prior_condv.notify_all();
 
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
+    if (_normal_pool) {
+        _normal_pool->shutdown();
+    }
 
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    if (_high_prior_pool) {
+        _high_prior_pool->shutdown();
     }
 }
 
-void TaskWorkerPool::_make_snapshot_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TSnapshotRequest snapshot_request;
+void PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
+    _submit_task(task, [this](auto&& task) {
+        auto req = std::make_unique<TAgentTaskRequest>(task);
+        add_task_count(*req, 1);
+        if (req->__isset.priority && req->priority == TPriority::HIGH) {
+            std::lock_guard lock(_mtx);
+            _high_prior_queue.push_back(std::move(req));
+            _high_prior_condv.notify_one();
+            _normal_condv.notify_one();
+        } else {
+            std::lock_guard lock(_mtx);
+            _normal_queue.push_back(std::move(req));
+            _normal_condv.notify_one();
+        }
+        return Status::OK();
+    });
+}
+
+void PriorTaskWorkerPool::normal_loop() {
+    while (true) {
+        std::unique_ptr<TAgentTaskRequest> req;
+
         {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
+            std::unique_lock lock(_mtx);
+            _normal_condv.wait(lock, [&] {
+                return !_normal_queue.empty() || !_high_prior_queue.empty() || 
_stopped;
+            });
+
+            if (_stopped) {
                 return;
             }
 
-            agent_task_req = _tasks.front();
-            snapshot_request = agent_task_req.snapshot_req;
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get snapshot task. signature=" << 
agent_task_req.signature;
-
-        string snapshot_path;
-        bool allow_incremental_clone = false; // not used
-        std::vector<string> snapshot_files;
-        Status status = 
SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path,
-                                                                   
&allow_incremental_clone);
-        if (status.ok() && snapshot_request.__isset.list_files) {
-            // list and save all snapshot files
-            // snapshot_path like: data/snapshot/20180417205230.1.86400
-            // we need to add subdir: tablet_id/schema_hash/
-            std::vector<io::FileInfo> files;
-            bool exists = true;
-            io::Path path = fmt::format("{}/{}/{}/", snapshot_path, 
snapshot_request.tablet_id,
-                                        snapshot_request.schema_hash);
-            status = io::global_local_filesystem()->list(path, true, &files, 
&exists);
-            if (status.ok()) {
-                for (auto& file : files) {
-                    snapshot_files.push_back(file.file_name);
-                }
+            if (!_high_prior_queue.empty()) {
+                req = std::move(_high_prior_queue.front());
+                _high_prior_queue.pop_front();
+            } else if (!_normal_queue.empty()) {
+                req = std::move(_normal_queue.front());
+                _normal_queue.pop_front();
+            } else {
+                continue;
             }
         }
-        if (!status.ok()) {
-            LOG_WARNING("failed to make snapshot")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", snapshot_request.tablet_id)
-                    .tag("version", snapshot_request.version)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully make snapshot")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", snapshot_request.tablet_id)
-                    .tag("version", snapshot_request.version)
-                    .tag("snapshot_path", snapshot_path);
-        }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_snapshot_path(snapshot_path);
-        finish_task_request.__set_snapshot_files(snapshot_files);
-        finish_task_request.__set_task_status(status.to_thrift());
 
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+        _callback(*req);
+        add_task_count(*req, -1);
     }
 }
 
-void TaskWorkerPool::_release_snapshot_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TReleaseSnapshotRequest release_snapshot_request;
+void PriorTaskWorkerPool::high_prior_loop() {
+    while (true) {
+        std::unique_ptr<TAgentTaskRequest> req;
+
         {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
+            std::unique_lock lock(_mtx);
+            _high_prior_condv.wait(lock, [&] { return 
!_high_prior_queue.empty() || _stopped; });
+
+            if (_stopped) {
                 return;
             }
 
-            agent_task_req = _tasks.front();
-            release_snapshot_request = agent_task_req.release_snapshot_req;
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get release snapshot task. signature=" << 
agent_task_req.signature;
+            if (_high_prior_queue.empty()) {
+                continue;
+            }
 
-        string& snapshot_path = release_snapshot_request.snapshot_path;
-        Status status = 
SnapshotManager::instance()->release_snapshot(snapshot_path);
-        if (!status.ok()) {
-            LOG_WARNING("failed to release snapshot")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("snapshot_path", snapshot_path)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully release snapshot")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("snapshot_path", snapshot_path);
+            req = std::move(_high_prior_queue.front());
+            _high_prior_queue.pop_front();
         }
 
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+        _callback(*req);
+        add_task_count(*req, -1);
     }
 }
 
-Status TaskWorkerPool::_get_tablet_info(const TTabletId tablet_id, const 
TSchemaHash schema_hash,
-                                        int64_t signature, TTabletInfo* 
tablet_info) {
-    tablet_info->__set_tablet_id(tablet_id);
-    tablet_info->__set_schema_hash(schema_hash);
-    return 
StorageEngine::instance()->tablet_manager()->report_tablet_info(tablet_info);
-}
+ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, 
int report_interval_s,
+                           std::function<void()> callback)
+        : _name(std::move(name)) {
+    auto report_loop = [this, &master_info, report_interval_s, callback = 
std::move(callback)] {
+        StorageEngine::instance()->register_report_listener(this);
+        while (true) {
+            {
+                std::unique_lock lock(_mtx);
+                _condv.wait_for(lock, std::chrono::seconds(report_interval_s),
+                                [&] { return _stopped || _signal; });
 
-void TaskWorkerPool::_move_dir_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TMoveDirReq move_dir_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
+                if (_stopped) {
+                    break;
+                }
+
+                if (_signal) {
+                    // Consume received signal
+                    _signal = false;
+                }
             }
 
-            agent_task_req = _tasks.front();
-            move_dir_req = agent_task_req.move_dir_req;
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get move dir task. signature=" << 
agent_task_req.signature
-                  << ", job_id=" << move_dir_req.job_id;
-        Status status = _move_dir(move_dir_req.tablet_id, move_dir_req.src, 
move_dir_req.job_id,
-                                  true /* TODO */);
+            if (master_info.network_address.port == 0) {
+                // port == 0 means not received heartbeat yet
+                LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing report";
+                continue;
+            }
 
-        if (!status.ok()) {
-            LOG_WARNING("failed to move dir")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", move_dir_req.job_id)
-                    .tag("tablet_id", move_dir_req.tablet_id)
-                    .tag("src", move_dir_req.src)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully move dir")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("job_id", move_dir_req.job_id)
-                    .tag("tablet_id", move_dir_req.tablet_id)
-                    .tag("src", move_dir_req.src);
+            callback();
         }
+        StorageEngine::instance()->deregister_report_listener(this);
+    };
 
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
+    auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
+    CHECK(st.ok()) << name << ": " << st;
 }
 
-Status TaskWorkerPool::_move_dir(const TTabletId tablet_id, const std::string& 
src, int64_t job_id,
-                                 bool overwrite) {
-    TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
-    if (tablet == nullptr) {
-        return Status::InvalidArgument("Could not find tablet");
-    }
-    SnapshotLoader loader(_env, job_id, tablet_id);
-    return loader.move(src, tablet, overwrite);
+ReportWorker::~ReportWorker() {
+    stop();
 }
 
-void TaskWorkerPool::_handle_report(const TReportRequest& request, ReportType 
type) {
-    TMasterResult result;
-    Status status = MasterServerClient::instance()->report(request, &result);
-    bool is_report_success = false;
-    if (!status.ok()) {
-        LOG_WARNING("failed to report {}", TYPE_STRING(type))
-                .tag("host", _master_info.network_address.hostname)
-                .tag("port", _master_info.network_address.port)
-                .error(status);
-    } else if (result.status.status_code != TStatusCode::OK) {
-        LOG_WARNING("failed to report {}", TYPE_STRING(type))
-                .tag("host", _master_info.network_address.hostname)
-                .tag("port", _master_info.network_address.port)
-                .error(result.status);
-    } else {
-        is_report_success = true;
-    }
-    switch (type) {
-    case TASK:
-        DorisMetrics::instance()->report_task_requests_total->increment(1);
-        if (!is_report_success) {
-            
DorisMetrics::instance()->report_task_requests_failed->increment(1);
-        }
-        break;
-    case DISK:
-        DorisMetrics::instance()->report_disk_requests_total->increment(1);
-        if (!is_report_success) {
-            
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
-        }
-        break;
-    case TABLET:
-        DorisMetrics::instance()->report_tablet_requests_total->increment(1);
-        if (!is_report_success) {
-            
DorisMetrics::instance()->report_tablet_requests_failed->increment(1);
-        }
-        break;
-    default:
-        break;
+void ReportWorker::notify() {
+    {
+        std::lock_guard lock(_mtx);
+        _signal = true;
     }
+    _condv.notify_all();
 }
 
-void TaskWorkerPool::_random_sleep(int second) {
-    Random rnd(UnixMillis());
-    sleep(rnd.Uniform(second) + 1);
-}
-
-void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TCompactionReq compaction_req;
-
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            compaction_req = agent_task_req.compaction_req;
-            _tasks.pop_front();
+void ReportWorker::stop() {
+    {
+        std::lock_guard lock(_mtx);
+        if (_stopped) {
+            return;
         }
 
-        LOG(INFO) << "get compaction task. signature=" << 
agent_task_req.signature
-                  << ", compaction_type=" << compaction_req.type;
-
-        CompactionType compaction_type;
-        if (compaction_req.type == "base") {
-            compaction_type = CompactionType::BASE_COMPACTION;
-        } else {
-            compaction_type = CompactionType::CUMULATIVE_COMPACTION;
-        }
+        _stopped = true;
+    }
+    _condv.notify_all();
+    if (_thread) {
+        _thread->join();
+    }
+}
 
-        TabletSharedPtr tablet_ptr =
-                
StorageEngine::instance()->tablet_manager()->get_tablet(compaction_req.tablet_id);
-        if (tablet_ptr != nullptr) {
-            auto data_dir = tablet_ptr->data_dir();
-            if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), 
compaction_type)) {
-                LOG(WARNING) << "could not do compaction. tablet_id=" << 
tablet_ptr->tablet_id()
-                             << ", compaction_type=" << compaction_type;
-                _remove_task_info(agent_task_req.task_type, 
agent_task_req.signature);
-                continue;
-            }
+void alter_inverted_index_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
+    LOG(INFO) << "get alter inverted index task. signature=" << req.signature
+              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
+              << ", job_id=" << alter_inverted_index_rq.job_id;
+
+    Status status = Status::OK();
+    auto tablet_ptr = 
engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
+    if (tablet_ptr != nullptr) {
+        EngineIndexChangeTask engine_task(alter_inverted_index_rq);
+        status = engine_task.execute();
+    } else {
+        status = Status::NotFound("could not find tablet {}", 
alter_inverted_index_rq.tablet_id);
+    }
 
-            Status status = StorageEngine::instance()->submit_compaction_task(
-                    tablet_ptr, compaction_type, false);
-            if (!status.ok()) {
-                LOG(WARNING) << "failed to submit table compaction task. 
error=" << status;
-            }
-            _remove_task_info(agent_task_req.task_type, 
agent_task_req.signature);
+    // Return result to fe
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    std::vector<TTabletInfo> finish_tablet_infos;
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to alter inverted index task, signature=" << 
req.signature
+                     << ", tablet_id=" << alter_inverted_index_rq.tablet_id
+                     << ", job_id=" << alter_inverted_index_rq.job_id << ", 
error=" << status;
+    } else {
+        LOG(INFO) << "successfully alter inverted index task, signature=" << 
req.signature
+                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
+                  << ", job_id=" << alter_inverted_index_rq.job_id;
+        TTabletInfo tablet_info;
+        status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id,
+                                 alter_inverted_index_rq.schema_hash, 
&tablet_info);
+        if (status.ok()) {
+            finish_tablet_infos.push_back(tablet_info);
         }
+        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
     }
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+void update_tablet_meta_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    LOG(INFO) << "get update tablet meta task. signature=" << req.signature;
 
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
+    Status status;
+    const auto& update_tablet_meta_req = req.update_tablet_meta_info_req;
+    for (const auto& tablet_meta_info : 
update_tablet_meta_req.tabletMetaInfos) {
+        auto tablet = 
engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id);
+        if (tablet == nullptr) {
+            status = Status::NotFound("tablet not found");
+            LOG(WARNING) << "could not find tablet when update tablet meta. 
tablet_id="
+                         << tablet_meta_info.tablet_id;
+            continue;
         }
-        TPushStoragePolicyReq& push_storage_policy_req = 
agent_task_req.push_storage_policy_req;
-        // refresh resource
-        for (auto& resource : push_storage_policy_req.resource) {
-            auto existed_resource = get_storage_resource(resource.id);
-            if (existed_resource.version >= resource.version) {
-                continue;
-            }
-            if (resource.__isset.s3_storage_param) {
-                Status st;
-                S3Conf s3_conf;
-                s3_conf.ak = std::move(resource.s3_storage_param.ak);
-                s3_conf.sk = std::move(resource.s3_storage_param.sk);
-                s3_conf.endpoint = 
std::move(resource.s3_storage_param.endpoint);
-                s3_conf.region = std::move(resource.s3_storage_param.region);
-                s3_conf.prefix = 
std::move(resource.s3_storage_param.root_path);
-                s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
-                s3_conf.connect_timeout_ms = 
resource.s3_storage_param.conn_timeout_ms;
-                s3_conf.max_connections = resource.s3_storage_param.max_conn;
-                s3_conf.request_timeout_ms = 
resource.s3_storage_param.request_timeout_ms;
-                // When using cold heat separation in minio, user might use ip 
address directly,
-                // which needs enable use_virtual_addressing to true
-                s3_conf.use_virtual_addressing = 
!resource.s3_storage_param.use_path_style;
-                std::shared_ptr<io::S3FileSystem> fs;
-                if (existed_resource.fs == nullptr) {
-                    st = io::S3FileSystem::create(s3_conf, 
std::to_string(resource.id), &fs);
-                } else {
-                    fs = 
std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
-                    fs->set_conf(s3_conf);
-                }
-                if (!st.ok()) {
-                    LOG(WARNING) << "update s3 resource failed: " << st;
-                } else {
-                    LOG_INFO("successfully update s3 resource")
-                            .tag("resource_id", resource.id)
-                            .tag("resource_name", resource.name)
-                            .tag("s3_conf", s3_conf.to_string());
-                    put_storage_resource(resource.id, {std::move(fs), 
resource.version});
-                }
-            } else if (resource.__isset.hdfs_storage_param) {
-                Status st;
-                std::shared_ptr<io::HdfsFileSystem> fs;
-                if (existed_resource.fs == nullptr) {
-                    st = 
io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs);
-                } else {
-                    fs = 
std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
-                }
-                if (!st.ok()) {
-                    LOG(WARNING) << "update hdfs resource failed: " << st;
-                } else {
-                    LOG_INFO("successfully update hdfs resource")
-                            .tag("resource_id", resource.id)
-                            .tag("resource_name", resource.name);
-                    put_storage_resource(resource.id, {std::move(fs), 
resource.version});
-                }
-            } else {
-                LOG(WARNING) << "unknown resource=" << resource;
+        bool need_to_save = false;
+        if (tablet_meta_info.__isset.storage_policy_id) {
+            
tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
+            need_to_save = true;
+        }
+        if (tablet_meta_info.__isset.is_in_memory) {
+            tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
+                    tablet_meta_info.is_in_memory);
+            std::shared_lock rlock(tablet->get_header_lock());
+            for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
+                
rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
             }
-        }
-        // drop storage policy
-        for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
-            delete_storage_policy(policy_id);
-        }
-        // refresh storage policy
-        for (auto& storage_policy : push_storage_policy_req.storage_policy) {
-            auto existed_storage_policy = 
get_storage_policy(storage_policy.id);
-            if (existed_storage_policy == nullptr ||
-                existed_storage_policy->version < storage_policy.version) {
-                auto storage_policy1 = std::make_shared<StoragePolicy>();
-                storage_policy1->name = std::move(storage_policy.name);
-                storage_policy1->version = storage_policy.version;
-                storage_policy1->cooldown_datetime = 
storage_policy.cooldown_datetime;
-                storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
-                storage_policy1->resource_id = storage_policy.resource_id;
-                LOG_INFO("successfully update storage policy")
-                        .tag("storage_policy_id", storage_policy.id)
-                        .tag("storage_policy", storage_policy1->to_string());
-                put_storage_policy(storage_policy.id, 
std::move(storage_policy1));
+            
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
+            need_to_save = true;
+        }
+        if (tablet_meta_info.__isset.compaction_policy) {
+            if (tablet_meta_info.compaction_policy != "size_based" &&
+                tablet_meta_info.compaction_policy != "time_series") {
+                status = Status::InvalidArgument(
+                        "invalid compaction policy, only support for 
size_based or "
+                        "time_series");
+                continue;
             }
+            
tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
+            need_to_save = true;
         }
-    }
-}
-
-void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
+        if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
+            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+                status = Status::InvalidArgument(
+                        "only time series compaction policy support time 
series config");
+                continue;
             }
-            if (!_is_work) {
-                return;
+            tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
+                    tablet_meta_info.time_series_compaction_goal_size_mbytes);
+            need_to_save = true;
+        }
+        if 
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
+            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+                status = Status::InvalidArgument(
+                        "only time series compaction policy support time 
series config");
+                continue;
             }
-
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
-
-        TPushCooldownConfReq& push_cooldown_conf_req = 
agent_task_req.push_cooldown_conf;
-        for (auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
-            int64_t tablet_id = cooldown_conf.tablet_id;
-            TabletSharedPtr tablet =
-                    
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
-            if (tablet == nullptr) {
-                LOG(WARNING) << "failed to get tablet. tablet_id=" << 
tablet_id;
+            
tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
+                    
tablet_meta_info.time_series_compaction_file_count_threshold);
+            need_to_save = true;
+        }
+        if 
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
+            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+                status = Status::InvalidArgument(
+                        "only time series compaction policy support time 
series config");
                 continue;
             }
-            if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
-                                             
cooldown_conf.cooldown_replica_id) &&
-                cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
-                tablet->tablet_meta()->cooldown_meta_id().initialized()) {
-                Tablet::async_write_cooldown_meta(tablet);
+            
tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
+                    
tablet_meta_info.time_series_compaction_time_threshold_seconds);
+            need_to_save = true;
+        }
+        if (tablet_meta_info.__isset.replica_id) {
+            tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
+        }
+        if (tablet_meta_info.__isset.binlog_config) {
+            // check binlog_config require fields: enable, ttl_seconds, 
max_bytes, max_history_nums
+            const auto& t_binlog_config = tablet_meta_info.binlog_config;
+            if (!t_binlog_config.__isset.enable || 
!t_binlog_config.__isset.ttl_seconds ||
+                !t_binlog_config.__isset.max_bytes || 
!t_binlog_config.__isset.max_history_nums) {
+                status = Status::InvalidArgument("invalid binlog config, some 
fields not set");
+                LOG(WARNING) << fmt::format(
+                        "invalid binlog config, some fields not set, 
tablet_id={}, "
+                        "t_binlog_config={}",
+                        tablet_meta_info.tablet_id,
+                        apache::thrift::ThriftDebugString(t_binlog_config));
+                continue;
             }
-        }
-    }
-}
-
-CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel 
thread_model)
-        : TaskWorkerPool(TaskWorkerType::CREATE_TABLE, env, 
*env->master_info(), thread_model) {
-    _worker_count = config::create_tablet_worker_count;
-    _cb = [this]() { _create_tablet_worker_thread_callback(); };
-}
 
-void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
+            BinlogConfig new_binlog_config;
+            new_binlog_config = tablet_meta_info.binlog_config;
+            LOG(INFO) << fmt::format(
+                    "update tablet meta binlog config. tablet_id={}, 
old_binlog_config={}, "
+                    "new_binlog_config={}",
+                    tablet_meta_info.tablet_id, 
tablet->tablet_meta()->binlog_config().to_string(),
+                    new_binlog_config.to_string());
+            tablet->set_binlog_config(new_binlog_config);
+            need_to_save = true;
+        }
+        if (tablet_meta_info.__isset.enable_single_replica_compaction) {
+            std::shared_lock rlock(tablet->get_header_lock());
+            
tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
+                    tablet_meta_info.enable_single_replica_compaction);
+            for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
+                
rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
+                        tablet_meta_info.enable_single_replica_compaction);
             }
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
+            
tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
+                    tablet_meta_info.enable_single_replica_compaction);
+            need_to_save = true;
         }
-        const TCreateTabletReq& create_tablet_req = 
agent_task_req.create_tablet_req;
-        RuntimeProfile runtime_profile("CreateTablet");
-        RuntimeProfile* profile = &runtime_profile;
-        MonotonicStopWatch watch;
-        watch.start();
-        SCOPED_CLEANUP({
-            int64_t elapsed_time = static_cast<int64_t>(watch.elapsed_time());
-            if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
-                COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
-                std::stringstream ss;
-                profile->pretty_print(&ss);
-                LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 
<< std::endl
-                             << ss.str();
-            }
-        });
-        DorisMetrics::instance()->create_tablet_requests_total->increment(1);
-        VLOG_NOTICE << "start to create tablet " << 
create_tablet_req.tablet_id;
 
-        std::vector<TTabletInfo> finish_tablet_infos;
-        VLOG_NOTICE << "create tablet: " << create_tablet_req;
-        Status status = 
StorageEngine::instance()->create_tablet(create_tablet_req, profile);
-        if (!status.ok()) {
-            
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
-            LOG_WARNING("failed to create tablet, reason={}", 
status.to_string())
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", create_tablet_req.tablet_id)
-                    .error(status);
-        } else {
-            ++_s_report_version;
-            // get path hash of the created tablet
-            TabletSharedPtr tablet;
-            {
-                SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
-                tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                        create_tablet_req.tablet_id);
+        if (tablet_meta_info.__isset.skip_write_index_on_load) {
+            std::shared_lock rlock(tablet->get_header_lock());
+            
tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
+                    tablet_meta_info.skip_write_index_on_load);
+            for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
+                rowset_meta->tablet_schema()->set_skip_write_index_on_load(
+                        tablet_meta_info.skip_write_index_on_load);
             }
-            DCHECK(tablet != nullptr);
-            TTabletInfo tablet_info;
-            tablet_info.tablet_id = tablet->table_id();
-            tablet_info.schema_hash = tablet->schema_hash();
-            tablet_info.version = create_tablet_req.version;
-            // Useless but it is a required field in TTabletInfo
-            tablet_info.version_hash = 0;
-            tablet_info.row_count = 0;
-            tablet_info.data_size = 0;
-            tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
-            tablet_info.__set_replica_id(tablet->replica_id());
-            finish_tablet_infos.push_back(tablet_info);
-            LOG_INFO("successfully create tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", create_tablet_req.tablet_id);
+            tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
+                    tablet_meta_info.skip_write_index_on_load);
+            need_to_save = true;
         }
+        if (need_to_save) {
+            std::shared_lock rlock(tablet->get_header_lock());
+            tablet->save_meta();
+        }
+    }
+
+    LOG(INFO) << "finish update tablet meta task. signature=" << req.signature;
+    if (req.signature != -1) {
         TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_report_version(_s_report_version);
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
         finish_task_request.__set_task_status(status.to_thrift());
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(req.task_type);
+        finish_task_request.__set_signature(req.signature);
+        finish_task(finish_task_request);
+        remove_task_info(req.task_type, req.signature);
     }
 }
 
-DropTableTaskPool::DropTableTaskPool(ExecEnv* env, ThreadModel thread_model)
-        : TaskWorkerPool(TaskWorkerType::DROP_TABLE, env, *env->master_info(), 
thread_model) {
-    _worker_count = config::drop_tablet_worker_count;
-    _cb = [this]() { _drop_tablet_worker_thread_callback(); };
+void check_consistency_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    uint32_t checksum = 0;
+    const auto& check_consistency_req = req.check_consistency_req;
+    EngineChecksumTask engine_task(check_consistency_req.tablet_id,
+                                   check_consistency_req.schema_hash, 
check_consistency_req.version,
+                                   &checksum);
+    Status status = engine_task.execute();
+    if (!status.ok()) {
+        LOG_WARNING("failed to check consistency")
+                .tag("signature", req.signature)
+                .tag("tablet_id", check_consistency_req.tablet_id)
+                .error(status);
+    } else {
+        LOG_INFO("successfully check consistency")
+                .tag("signature", req.signature)
+                .tag("tablet_id", check_consistency_req.tablet_id)
+                .tag("checksum", checksum);
+    }
+
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
+    finish_task_request.__set_request_version(check_consistency_req.version);
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void DropTableTaskPool::_drop_tablet_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
+void report_task_callback(const TMasterInfo& master_info) {
+    TReportRequest request;
+    random_sleep(5);
+    request.__isset.tasks = true;
+    {
+        std::lock_guard lock(s_task_signatures_mtx);
+        auto& tasks = request.tasks;
+        for (auto&& [task_type, signatures] : s_task_signatures) {
+            auto& set = tasks[task_type];
+            for (auto&& signature : signatures) {
+                set.insert(signature);
             }
-
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
-        const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
-        Status status;
-        TabletSharedPtr dropped_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                drop_tablet_req.tablet_id, false);
-        if (dropped_tablet != nullptr) {
-            status = StorageEngine::instance()->tablet_manager()->drop_tablet(
-                    drop_tablet_req.tablet_id, drop_tablet_req.replica_id,
-                    drop_tablet_req.is_drop_table_or_partition);
-        } else {
-            status = Status::NotFound("could not find tablet {}", 
drop_tablet_req.tablet_id);
-        }
-        if (status.ok()) {
-            // if tablet is dropped by fe, then the related txn should also be 
removed
-            
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
-                    dropped_tablet->data_dir()->get_meta(), 
drop_tablet_req.tablet_id,
-                    dropped_tablet->tablet_uid());
-            LOG_INFO("successfully drop tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", drop_tablet_req.tablet_id);
-        } else {
-            LOG_WARNING("failed to drop tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", drop_tablet_req.tablet_id)
-                    .error(status);
         }
+    }
+    request.__set_backend(BackendOptions::get_local_backend());
+    bool succ = handle_report(request, master_info, "task");
+    report_task_total << 1;
+    if (!succ) [[unlikely]] {
+        report_task_failed << 1;
+    }
+}
 
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
+void report_disk_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
+    // Random sleep 1~5 seconds before doing report.
+    // In order to avoid the problem that the FE receives many report requests 
at the same time
+    // and can not be processed.
+    random_sleep(5);
 
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    TReportRequest request;
+    request.__set_backend(BackendOptions::get_local_backend());
+    request.__isset.disks = true;
+
+    std::vector<DataDirInfo> data_dir_infos;
+    static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* 
update */));
+
+    for (auto& root_path_info : data_dir_infos) {
+        TDisk disk;
+        disk.__set_root_path(root_path_info.path);
+        disk.__set_path_hash(root_path_info.path_hash);
+        disk.__set_storage_medium(root_path_info.storage_medium);
+        disk.__set_disk_total_capacity(root_path_info.disk_capacity);
+        disk.__set_data_used_capacity(root_path_info.local_used_capacity);
+        disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
+        disk.__set_disk_available_capacity(root_path_info.available);
+        disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
+        disk.__set_used(root_path_info.is_used);
+        request.disks[root_path_info.path] = disk;
+    }
+    request.__set_num_cores(CpuInfo::num_cores());
+    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
+                                                 ? 
config::pipeline_executor_size
+                                                 : CpuInfo::num_cores());
+    bool succ = handle_report(request, master_info, "disk");
+    report_disk_total << 1;
+    if (!succ) [[unlikely]] {
+        report_disk_failed << 1;
     }
 }
 
-PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, 
PushWokerType type)
-        : TaskWorkerPool(
-                  type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : 
TaskWorkerType::DELETE,
-                  env, *env->master_info(), thread_model),
-          _push_worker_type(type) {
-    if (_push_worker_type == PushWokerType::LOAD_V2) {
-        _worker_count =
-                config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
+void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
+    random_sleep(5);
 
-    } else {
-        _worker_count = config::delete_worker_count;
+    TReportRequest request;
+    request.__set_backend(BackendOptions::get_local_backend());
+    request.__isset.tablets = true;
+
+    uint64_t report_version = s_report_version;
+    
static_cast<void>(engine.tablet_manager()->build_all_report_tablets_info(&request.tablets));
+    if (report_version < s_report_version) {
+        // TODO llj This can only reduce the possibility for report error, but 
can't avoid it.
+        // If FE create a tablet in FE meta and send CREATE task to this BE, 
the tablet may not be included in this
+        // report, and the report version has a small probability that it has 
not been updated in time. When FE
+        // receives this report, it is possible to delete the new tablet.
+        LOG(WARNING) << "report version " << report_version << " change to " 
<< s_report_version;
+        
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
+        return;
     }
-    _cb = [this]() { _push_worker_thread_callback(); };
-}
+    int64_t max_compaction_score =
+            
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
+                     
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
+    request.__set_tablet_max_compaction_score(max_compaction_score);
+    request.__set_report_version(report_version);
+
+    // report storage policy and resource
+    auto& storage_policy_list = request.storage_policy;
+    for (auto [id, version] : get_storage_policy_ids()) {
+        auto& storage_policy = storage_policy_list.emplace_back();
+        storage_policy.__set_id(id);
+        storage_policy.__set_version(version);
+    }
+    request.__isset.storage_policy = true;
+    auto& resource_list = request.resource;
+    for (auto [id, version] : get_storage_resource_ids()) {
+        auto& resource = resource_list.emplace_back();
+        resource.__set_id(id);
+        resource.__set_version(version);
+    }
+    request.__isset.resource = true;
 
-void PushTaskPool::_push_worker_thread_callback() {
-    // gen high priority worker thread
-    TPriority::type priority = TPriority::NORMAL;
-    int32_t push_worker_count_high_priority = 
config::push_worker_count_high_priority;
-    if (_push_worker_type == PushWokerType::LOAD_V2) {
-        static uint32_t s_worker_count = 0;
-        std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
-        if (s_worker_count < push_worker_count_high_priority) {
-            ++s_worker_count;
-            priority = TPriority::HIGH;
-        }
+    bool succ = handle_report(request, master_info, "tablet");
+    report_tablet_total << 1;
+    if (!succ) [[unlikely]] {
+        report_tablet_failed << 1;
     }
+}
 
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+void upload_callback(StorageEngine& engine, ExecEnv* env, const 
TAgentTaskRequest& req) {
+    const auto& upload_request = req.upload_req;
 
-            if (priority == TPriority::HIGH) {
-                const auto it = std::find_if(
-                        _tasks.cbegin(), _tasks.cend(), [](const 
TAgentTaskRequest& req) {
-                            return req.__isset.priority && req.priority == 
TPriority::HIGH;
-                        });
-
-                if (it == _tasks.cend()) {
-                    // there is no high priority task. notify other thread to 
handle normal task
-                    _worker_thread_condition_variable.notify_all();
-                    sleep(1);
-                    continue;
-                }
-                agent_task_req = *it;
-                _tasks.erase(it);
-            } else {
-                agent_task_req = _tasks.front();
-                _tasks.pop_front();
-            }
-        }
-        TPushReq& push_req = agent_task_req.push_req;
+    LOG(INFO) << "get upload task. signature=" << req.signature
+              << ", job_id=" << upload_request.job_id;
+
+    std::map<int64_t, std::vector<std::string>> tablet_files;
+    std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
+            env, upload_request.job_id, req.signature, 
upload_request.broker_addr,
+            upload_request.broker_prop);
+    Status status =
+            loader->init(upload_request.__isset.storage_backend ? 
upload_request.storage_backend
+                                                                : 
TStorageBackendType::type::BROKER,
+                         upload_request.__isset.location ? 
upload_request.location : "");
+    if (status.ok()) {
+        status = loader->upload(upload_request.src_dest_map, &tablet_files);
+    }
 
-        LOG(INFO) << "get push task. signature=" << agent_task_req.signature
-                  << ", priority=" << priority << " push_type=" << 
push_req.push_type;
-        std::vector<TTabletInfo> tablet_infos;
+    if (!status.ok()) {
+        LOG_WARNING("failed to upload")
+                .tag("signature", req.signature)
+                .tag("job_id", upload_request.job_id)
+                .error(status);
+    } else {
+        LOG_INFO("successfully upload")
+                .tag("signature", req.signature)
+                .tag("job_id", upload_request.job_id);
+    }
 
-        EngineBatchLoadTask engine_task(push_req, &tablet_infos);
-        auto status = StorageEngine::instance()->execute_task(&engine_task);
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task_request.__set_tablet_files(tablet_files);
 
-        // Return result to fe
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        if (push_req.push_type == TPushType::DELETE) {
-            finish_task_request.__set_request_version(push_req.version);
-        }
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
+}
 
+void download_callback(StorageEngine& engine, ExecEnv* env, const 
TAgentTaskRequest& req) {
+    const auto& download_request = req.download_req;
+    LOG(INFO) << "get download task. signature=" << req.signature
+              << ", job_id=" << download_request.job_id
+              << ", task detail: " << 
apache::thrift::ThriftDebugString(download_request);
+
+    // TODO: download
+    std::vector<int64_t> downloaded_tablet_ids;
+
+    auto status = Status::OK();
+    if (download_request.__isset.remote_tablet_snapshots) {
+        std::unique_ptr<SnapshotLoader> loader =
+                std::make_unique<SnapshotLoader>(env, download_request.job_id, 
req.signature);
+        status = 
loader->remote_http_download(download_request.remote_tablet_snapshots,
+                                              &downloaded_tablet_ids);
+    } else {
+        std::unique_ptr<SnapshotLoader> loader = 
std::make_unique<SnapshotLoader>(
+                env, download_request.job_id, req.signature, 
download_request.broker_addr,
+                download_request.broker_prop);
+        status = loader->init(download_request.__isset.storage_backend
+                                      ? download_request.storage_backend
+                                      : TStorageBackendType::type::BROKER,
+                              download_request.__isset.location ? 
download_request.location : "");
         if (status.ok()) {
-            LOG_INFO("successfully execute push task")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", push_req.tablet_id)
-                    .tag("push_type", push_req.push_type);
-            ++_s_report_version;
-            finish_task_request.__set_finish_tablet_infos(tablet_infos);
-        } else {
-            LOG_WARNING("failed to execute push task")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", push_req.tablet_id)
-                    .tag("push_type", push_req.push_type)
-                    .error(status);
+            status = loader->download(download_request.src_dest_map, 
&downloaded_tablet_ids);
         }
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_report_version(_s_report_version);
+    }
 
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    if (!status.ok()) {
+        LOG_WARNING("failed to download")
+                .tag("signature", req.signature)
+                .tag("job_id", download_request.job_id)
+                .error(status);
+    } else {
+        LOG_INFO("successfully download")
+                .tag("signature", req.signature)
+                .tag("job_id", download_request.job_id);
     }
-}
 
-PublishVersionTaskPool::PublishVersionTaskPool(ExecEnv* env, ThreadModel 
thread_model)
-        : TaskWorkerPool(TaskWorkerType::PUBLISH_VERSION, env, 
*env->master_info(), thread_model) {
-    _worker_count = config::publish_version_worker_count;
-    _cb = [this]() { _publish_version_worker_thread_callback(); };
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
+void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& 
req) {
+    const auto& snapshot_request = req.snapshot_req;
+
+    LOG(INFO) << "get snapshot task. signature=" << req.signature;
+
+    string snapshot_path;
+    bool allow_incremental_clone = false; // not used
+    std::vector<string> snapshot_files;
+    Status status = 
SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path,
+                                                               
&allow_incremental_clone);
+    if (status.ok() && snapshot_request.__isset.list_files) {
+        // list and save all snapshot files
+        // snapshot_path like: data/snapshot/20180417205230.1.86400
+        // we need to add subdir: tablet_id/schema_hash/
+        std::vector<io::FileInfo> files;
+        bool exists = true;
+        io::Path path = fmt::format("{}/{}/{}/", snapshot_path, 
snapshot_request.tablet_id,
+                                    snapshot_request.schema_hash);
+        status = io::global_local_filesystem()->list(path, true, &files, 
&exists);
+        if (status.ok()) {
+            for (auto& file : files) {
+                snapshot_files.push_back(file.file_name);
             }
-
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
         }
+    }
+    if (!status.ok()) {
+        LOG_WARNING("failed to make snapshot")
+                .tag("signature", req.signature)
+                .tag("tablet_id", snapshot_request.tablet_id)
+                .tag("version", snapshot_request.version)
+                .error(status);
+    } else {
+        LOG_INFO("successfully make snapshot")
+                .tag("signature", req.signature)
+                .tag("tablet_id", snapshot_request.tablet_id)
+                .tag("version", snapshot_request.version)
+                .tag("snapshot_path", snapshot_path);
+    }
 
-        const TPublishVersionRequest& publish_version_req = 
agent_task_req.publish_version_req;
-        DorisMetrics::instance()->publish_task_request_total->increment(1);
-        VLOG_NOTICE << "get publish version task. signature=" << 
agent_task_req.signature;
-
-        std::set<TTabletId> error_tablet_ids;
-        std::map<TTabletId, TVersion> succ_tablets;
-        // partition_id, tablet_id, publish_version
-        std::vector<std::tuple<int64_t, int64_t, int64_t>> 
discontinuous_version_tablets;
-        std::map<TTableId, int64_t> table_id_to_num_delta_rows;
-        uint32_t retry_time = 0;
-        Status status;
-        bool is_task_timeout = false;
-        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
-            succ_tablets.clear();
-            error_tablet_ids.clear();
-            table_id_to_num_delta_rows.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, 
&error_tablet_ids,
-                                                 &succ_tablets, 
&discontinuous_version_tablets,
-                                                 &table_id_to_num_delta_rows);
-            status = StorageEngine::instance()->execute_task(&engine_task);
-            if (status.ok()) {
-                break;
-            } else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
-                int64_t time_elapsed = time(nullptr) - 
agent_task_req.recv_time;
-                if (time_elapsed > config::publish_version_task_timeout_s) {
-                    LOG(INFO) << "task elapsed " << time_elapsed
-                              << " seconds since it is inserted to queue, it 
is timeout";
-                    is_task_timeout = true;
-                } else {
-                    // version not continuous, put to queue and wait pre 
version publish
-                    // task execute
-                    std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-                    _tasks.push_back(agent_task_req);
-                    _worker_thread_condition_variable.notify_one();
-                }
-                LOG_EVERY_SECOND(INFO) << "wait for previous publish version 
task to be done, "
-                                       << "transaction_id: " << 
publish_version_req.transaction_id;
-                break;
-            } else {
-                LOG_WARNING("failed to publish version")
-                        .tag("transaction_id", 
publish_version_req.transaction_id)
-                        .tag("error_tablets_num", error_tablet_ids.size())
-                        .tag("retry_time", retry_time)
-                        .error(status);
-                ++retry_time;
-                std::this_thread::sleep_for(std::chrono::seconds(1));
-            }
-        }
-        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
-            continue;
-        }
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_snapshot_path(snapshot_path);
+    finish_task_request.__set_snapshot_files(snapshot_files);
+    finish_task_request.__set_task_status(status.to_thrift());
 
-        for (auto& item : discontinuous_version_tablets) {
-            StorageEngine::instance()->add_async_publish_task(
-                    std::get<0>(item), std::get<1>(item), std::get<2>(item),
-                    publish_version_req.transaction_id, false);
-        }
-        TFinishTaskRequest finish_task_request;
-        if (!status) {
-            DorisMetrics::instance()->publish_task_failed_total->increment(1);
-            // if publish failed, return failed, FE will ignore this error and
-            // check error tablet ids and FE will also republish this task
-            LOG_WARNING("failed to publish version")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("error_tablets_num", error_tablet_ids.size())
-                    .error(status);
-        } else {
-            if (!config::disable_auto_compaction &&
-                !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
-                for (auto [tablet_id, _] : succ_tablets) {
-                    TabletSharedPtr tablet =
-                            
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
-                    if (tablet != nullptr) {
-                        int64_t published_count =
-                                tablet->published_count.fetch_add(1, 
std::memory_order_relaxed);
-                        if (published_count % 10 == 0) {
-                            auto st = 
StorageEngine::instance()->submit_compaction_task(
-                                    tablet, 
CompactionType::CUMULATIVE_COMPACTION, true);
-                            if (!st.ok()) [[unlikely]] {
-                                LOG(WARNING) << "trigger compaction failed, 
tablet_id=" << tablet_id
-                                             << ", published=" << 
published_count << " : " << st;
-                            } else {
-                                LOG(INFO) << "trigger compaction succ, 
tablet_id:" << tablet_id
-                                          << ", published:" << published_count;
-                            }
-                        }
-                    } else {
-                        LOG(WARNING) << "trigger compaction failed, 
tablet_id:" << tablet_id;
-                    }
-                }
-            }
-            uint32_t cost_second = time(nullptr) - agent_task_req.recv_time;
-            g_publish_version_latency << cost_second;
-            LOG_INFO("successfully publish version")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("tablets_num", succ_tablets.size())
-                    .tag("cost(s)", cost_second);
-        }
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
+}
 
-        status.to_thrift(&finish_task_request.task_status);
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_report_version(_s_report_version);
-        finish_task_request.__set_succ_tablets(succ_tablets);
-        finish_task_request.__set_error_tablet_ids(
-                std::vector<TTabletId>(error_tablet_ids.begin(), 
error_tablet_ids.end()));
-        
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& 
req) {
+    const auto& release_snapshot_request = req.release_snapshot_req;
+
+    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
+
+    const string& snapshot_path = release_snapshot_request.snapshot_path;
+    Status status = 
SnapshotManager::instance()->release_snapshot(snapshot_path);
+    if (!status.ok()) {
+        LOG_WARNING("failed to release snapshot")
+                .tag("signature", req.signature)
+                .tag("snapshot_path", snapshot_path)
+                .error(status);
+    } else {
+        LOG_INFO("successfully release snapshot")
+                .tag("signature", req.signature)
+                .tag("snapshot_path", snapshot_path);
     }
+
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-ClearTransactionTaskPool::ClearTransactionTaskPool(ExecEnv* env, ThreadModel 
thread_model)
-        : TaskWorkerPool(TaskWorkerType::CLEAR_TRANSACTION_TASK, env, 
*env->master_info(),
-                         thread_model) {
-    _worker_count = config::clear_transaction_task_worker_count;
-    _cb = [this]() { _clear_transaction_task_worker_thread_callback(); };
+void move_dir_callback(StorageEngine& engine, ExecEnv* env, const 
TAgentTaskRequest& req) {
+    const auto& move_dir_req = req.move_dir_req;
+
+    LOG(INFO) << "get move dir task. signature=" << req.signature
+              << ", job_id=" << move_dir_req.job_id;
+    Status status;
+    auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id);
+    if (tablet == nullptr) {
+        status = Status::InvalidArgument("Could not find tablet");
+    } else {
+        SnapshotLoader loader(env, move_dir_req.job_id, 
move_dir_req.tablet_id);
+        status = loader.move(move_dir_req.src, tablet, true);
+    }
+
+    if (!status.ok()) {
+        LOG_WARNING("failed to move dir")
+                .tag("signature", req.signature)
+                .tag("job_id", move_dir_req.job_id)
+                .tag("tablet_id", move_dir_req.tablet_id)
+                .tag("src", move_dir_req.src)
+                .error(status);
+    } else {
+        LOG_INFO("successfully move dir")
+                .tag("signature", req.signature)
+                .tag("job_id", move_dir_req.job_id)
+                .tag("tablet_id", move_dir_req.tablet_id)
+                .tag("src", move_dir_req.src);
+    }
+
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void 
ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+void submit_table_compaction_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    const auto& compaction_req = req.compaction_req;
+
+    LOG(INFO) << "get compaction task. signature=" << req.signature
+              << ", compaction_type=" << compaction_req.type;
+
+    CompactionType compaction_type;
+    if (compaction_req.type == "base") {
+        compaction_type = CompactionType::BASE_COMPACTION;
+    } else {
+        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+    }
 
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
+    auto tablet_ptr = 
engine.tablet_manager()->get_tablet(compaction_req.tablet_id);
+    if (tablet_ptr != nullptr) {
+        auto* data_dir = tablet_ptr->data_dir();
+        if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), 
compaction_type)) {
+            LOG(WARNING) << "could not do compaction. tablet_id=" << 
tablet_ptr->tablet_id()
+                         << ", compaction_type=" << compaction_type;
+            return;
+        }
+
+        Status status = engine.submit_compaction_task(tablet_ptr, 
compaction_type, false);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to submit table compaction task. error=" 
<< status;
         }
-        const TClearTransactionTaskRequest& clear_transaction_task_req =
-                agent_task_req.clear_transaction_task_req;
-        LOG(INFO) << "get clear transaction task. signature=" << 
agent_task_req.signature
-                  << ", transaction_id=" << 
clear_transaction_task_req.transaction_id
-                  << ", partition_id_size=" << 
clear_transaction_task_req.partition_id.size();
-
-        Status status;
-
-        if (clear_transaction_task_req.transaction_id > 0) {
-            // transaction_id should be greater than zero.
-            // If it is not greater than zero, no need to execute
-            // the following clear_transaction_task() function.
-            if (!clear_transaction_task_req.partition_id.empty()) {
-                StorageEngine::instance()->clear_transaction_task(
-                        clear_transaction_task_req.transaction_id,
-                        clear_transaction_task_req.partition_id);
+    }
+}
+
+void push_storage_policy_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    const auto& push_storage_policy_req = req.push_storage_policy_req;
+    // refresh resource
+    for (auto& resource : push_storage_policy_req.resource) {
+        auto existed_resource = get_storage_resource(resource.id);
+        if (existed_resource.version >= resource.version) {
+            continue;
+        }
+        if (resource.__isset.s3_storage_param) {
+            Status st;
+            S3Conf s3_conf;
+            s3_conf.ak = std::move(resource.s3_storage_param.ak);
+            s3_conf.sk = std::move(resource.s3_storage_param.sk);
+            s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
+            s3_conf.region = std::move(resource.s3_storage_param.region);
+            s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
+            s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
+            s3_conf.connect_timeout_ms = 
resource.s3_storage_param.conn_timeout_ms;
+            s3_conf.max_connections = resource.s3_storage_param.max_conn;
+            s3_conf.request_timeout_ms = 
resource.s3_storage_param.request_timeout_ms;
+            // When using cold heat separation in minio, user might use ip 
address directly,
+            // which needs enable use_virtual_addressing to true
+            s3_conf.use_virtual_addressing = 
!resource.s3_storage_param.use_path_style;
+            std::shared_ptr<io::S3FileSystem> fs;
+            if (existed_resource.fs == nullptr) {
+                st = io::S3FileSystem::create(s3_conf, 
std::to_string(resource.id), &fs);
+            } else {
+                fs = 
std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
+                fs->set_conf(s3_conf);
+            }
+            if (!st.ok()) {
+                LOG(WARNING) << "update s3 resource failed: " << st;
+            } else {
+                LOG_INFO("successfully update s3 resource")
+                        .tag("resource_id", resource.id)
+                        .tag("resource_name", resource.name)
+                        .tag("s3_conf", s3_conf.to_string());
+                put_storage_resource(resource.id, {std::move(fs), 
resource.version});
+            }
+        } else if (resource.__isset.hdfs_storage_param) {
+            Status st;
+            std::shared_ptr<io::HdfsFileSystem> fs;
+            if (existed_resource.fs == nullptr) {
+                st = io::HdfsFileSystem::create(resource.hdfs_storage_param, 
"", nullptr, &fs);
+            } else {
+                fs = 
std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
+            }
+            if (!st.ok()) {
+                LOG(WARNING) << "update hdfs resource failed: " << st;
             } else {
-                StorageEngine::instance()->clear_transaction_task(
-                        clear_transaction_task_req.transaction_id);
+                LOG_INFO("successfully update hdfs resource")
+                        .tag("resource_id", resource.id)
+                        .tag("resource_name", resource.name);
+                put_storage_resource(resource.id, {std::move(fs), 
resource.version});
             }
-            LOG(INFO) << "finish to clear transaction task. signature=" << 
agent_task_req.signature
-                      << ", transaction_id=" << 
clear_transaction_task_req.transaction_id;
         } else {
-            LOG(WARNING) << "invalid transaction id " << 
clear_transaction_task_req.transaction_id
-                         << ". signature= " << agent_task_req.signature;
+            LOG(WARNING) << "unknown resource=" << resource;
+        }
+    }
+    // drop storage policy
+    for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
+        delete_storage_policy(policy_id);
+    }
+    // refresh storage policy
+    for (auto& storage_policy : push_storage_policy_req.storage_policy) {
+        auto existed_storage_policy = get_storage_policy(storage_policy.id);
+        if (existed_storage_policy == nullptr ||
+            existed_storage_policy->version < storage_policy.version) {
+            auto storage_policy1 = std::make_shared<StoragePolicy>();
+            storage_policy1->name = std::move(storage_policy.name);
+            storage_policy1->version = storage_policy.version;
+            storage_policy1->cooldown_datetime = 
storage_policy.cooldown_datetime;
+            storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
+            storage_policy1->resource_id = storage_policy.resource_id;
+            LOG_INFO("successfully update storage policy")
+                    .tag("storage_policy_id", storage_policy.id)
+                    .tag("storage_policy", storage_policy1->to_string());
+            put_storage_policy(storage_policy.id, std::move(storage_policy1));
         }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
     }
 }
 
-AlterTableTaskPool::AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model)
-        : TaskWorkerPool(TaskWorkerType::ALTER_TABLE, env, 
*env->master_info(), thread_model) {
-    _worker_count = config::alter_tablet_worker_count;
-    _cb = [this]() { _alter_tablet_worker_thread_callback(); };
+void push_cooldown_conf_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
+    const auto& push_cooldown_conf_req = req.push_cooldown_conf;
+    for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
+        int64_t tablet_id = cooldown_conf.tablet_id;
+        TabletSharedPtr tablet = 
engine.tablet_manager()->get_tablet(tablet_id);
+        if (tablet == nullptr) {
+            LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
+            continue;
+        }
+        if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
+                                         cooldown_conf.cooldown_replica_id) &&
+            cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
+            tablet->tablet_meta()->cooldown_meta_id().initialized()) {
+            Tablet::async_write_cooldown_meta(tablet);
+        }
+    }
 }
 
-void AlterTableTaskPool::_alter_tablet_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
+void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& 
req) {
+    const auto& create_tablet_req = req.create_tablet_req;
+    RuntimeProfile runtime_profile("CreateTablet");
+    RuntimeProfile* profile = &runtime_profile;
+    MonotonicStopWatch watch;
+    watch.start();
+    SCOPED_CLEANUP({
+        auto elapsed_time = static_cast<int64_t>(watch.elapsed_time());
+        if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
+            COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
+            std::stringstream ss;
+            profile->pretty_print(&ss);
+            LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << 
std::endl << ss.str();
         }
-        int64_t signature = agent_task_req.signature;
-        LOG(INFO) << "get alter table task, signature: " << signature;
-        bool is_task_timeout = false;
-        if (agent_task_req.__isset.recv_time) {
-            int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
-            if (time_elapsed > config::report_task_interval_seconds * 20) {
-                LOG(INFO) << "task elapsed " << time_elapsed
-                          << " seconds since it is inserted to queue, it is 
timeout";
-                is_task_timeout = true;
-            }
-        }
-        if (!is_task_timeout) {
-            TFinishTaskRequest finish_task_request;
-            TTaskType::type task_type = agent_task_req.task_type;
-            switch (task_type) {
-            case TTaskType::ALTER:
-                _alter_tablet(agent_task_req, signature, task_type, 
&finish_task_request);
-                break;
-            default:
-                // pass
-                break;
-            }
-            _finish_task(finish_task_request);
+    });
+    DorisMetrics::instance()->create_tablet_requests_total->increment(1);
+    VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
+
+    std::vector<TTabletInfo> finish_tablet_infos;
+    VLOG_NOTICE << "create tablet: " << create_tablet_req;
+    Status status = engine.create_tablet(create_tablet_req, profile);
+    if (!status.ok()) {
+        DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
+        LOG_WARNING("failed to create tablet, reason={}", status.to_string())
+                .tag("signature", req.signature)
+                .tag("tablet_id", create_tablet_req.tablet_id)
+                .error(status);
+    } else {
+        s_report_version.fetch_add(1, std::memory_order_relaxed);
+        // get path hash of the created tablet
+        TabletSharedPtr tablet;
+        {
+            SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
+            tablet = 
engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id);
         }
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+        DCHECK(tablet != nullptr);
+        TTabletInfo tablet_info;
+        tablet_info.tablet_id = tablet->table_id();
+        tablet_info.schema_hash = tablet->schema_hash();
+        tablet_info.version = create_tablet_req.version;
+        // Useless but it is a required field in TTabletInfo
+        tablet_info.version_hash = 0;
+        tablet_info.row_count = 0;
+        tablet_info.data_size = 0;
+        tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
+        tablet_info.__set_replica_id(tablet->replica_id());
+        finish_tablet_infos.push_back(tablet_info);
+        LOG_INFO("successfully create tablet")
+                .tag("signature", req.signature)
+                .tag("tablet_id", create_tablet_req.tablet_id);
     }
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_report_version(s_report_version);
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& 
agent_task_req, int64_t signature,
-                                       const TTaskType::type task_type,
-                                       TFinishTaskRequest* 
finish_task_request) {
+void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) 
{
+    const auto& drop_tablet_req = req.drop_tablet_req;
     Status status;
-
-    string process_name;
-    switch (task_type) {
-    case TTaskType::ALTER:
-        process_name = "alter tablet";
-        break;
-    default:
-        std::string task_name;
-        EnumToString(TTaskType, task_type, task_name);
-        LOG(WARNING) << "schema change type invalid. type: " << task_name
-                     << ", signature: " << signature;
-        status = Status::NotSupported("Schema change type invalid");
-        break;
+    auto dropped_tablet = 
engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false);
+    if (dropped_tablet != nullptr) {
+        status = 
engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id,
+                                                      
drop_tablet_req.replica_id,
+                                                      
drop_tablet_req.is_drop_table_or_partition);
+    } else {
+        status = Status::NotFound("could not find tablet {}", 
drop_tablet_req.tablet_id);
     }
-
-    // Check last schema change status, if failed delete tablet file
-    // Do not need to adjust delete success or not
-    // Because if delete failed create rollup will failed
-    TTabletId new_tablet_id = 0;
-    TSchemaHash new_schema_hash = 0;
     if (status.ok()) {
-        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
-        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
-        EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
-        status = StorageEngine::instance()->execute_task(&engine_task);
+        // if tablet is dropped by fe, then the related txn should also be 
removed
+        engine.txn_manager()->force_rollback_tablet_related_txns(
+                dropped_tablet->data_dir()->get_meta(), 
drop_tablet_req.tablet_id,
+                dropped_tablet->tablet_uid());
+        LOG_INFO("successfully drop tablet")
+                .tag("signature", req.signature)
+                .tag("tablet_id", drop_tablet_req.tablet_id);
+    } else {
+        LOG_WARNING("failed to drop tablet")
+                .tag("signature", req.signature)
+                .tag("tablet_id", drop_tablet_req.tablet_id)
+                .error(status);
     }
 
-    if (status.ok()) {
-        ++_s_report_version;
-    }
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_task_status(status.to_thrift());
 
-    // Return result to fe
-    finish_task_request->__set_backend(BackendOptions::get_local_backend());
-    finish_task_request->__set_report_version(_s_report_version);
-    finish_task_request->__set_task_type(task_type);
-    finish_task_request->__set_signature(signature);
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
+}
 
-    std::vector<TTabletInfo> finish_tablet_infos;
-    if (status.ok()) {
-        TTabletInfo tablet_info;
-        status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, 
&tablet_info);
-        if (status.ok()) {
-            finish_tablet_infos.push_back(tablet_info);
-        }
+void push_callback(const TAgentTaskRequest& req) {
+    const auto& push_req = req.push_req;
+
+    LOG(INFO) << "get push task. signature=" << req.signature
+              << " push_type=" << push_req.push_type;
+    std::vector<TTabletInfo> tablet_infos;
+
+    EngineBatchLoadTask engine_task(const_cast<TPushReq&>(push_req), 
&tablet_infos);
+    auto status = engine_task.execute();
+
+    // Return result to fe
+    TFinishTaskRequest finish_task_request;
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    if (push_req.push_type == TPushType::DELETE) {
+        finish_task_request.__set_request_version(push_req.version);
     }
 
-    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
-        LOG_WARNING("failed to {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id)
-                .error(status);
+    if (status.ok()) {
+        LOG_INFO("successfully execute push task")
+                .tag("signature", req.signature)
+                .tag("tablet_id", push_req.tablet_id)
+                .tag("push_type", push_req.push_type);
+        ++s_report_version;
+        finish_task_request.__set_finish_tablet_infos(tablet_infos);
     } else {
-        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
-        LOG_INFO("successfully {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id);
+        LOG_WARNING("failed to execute push task")
+                .tag("signature", req.signature)
+                .tag("tablet_id", push_req.tablet_id)
+                .tag("push_type", push_req.push_type)
+                .error(status);
     }
-    finish_task_request->__set_task_status(status.to_thrift());
+    finish_task_request.__set_task_status(status.to_thrift());
+    finish_task_request.__set_report_version(s_report_version);
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
 }
 
-void TaskWorkerPool::_gc_binlog_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
+PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine)
+        : TaskWorkerPool("PUBLISH_VERSION", 
config::publish_version_worker_count,
+                         [this](const TAgentTaskRequest& task) { 
publish_version_callback(task); }),
+          _engine(engine) {}
 
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
+PublishVersionWorkerPool::~PublishVersionWorkerPool() = default;
 
-        std::unordered_map<int64_t, int64_t> gc_tablet_infos;
-        if (!agent_task_req.__isset.gc_binlog_req) {
-            LOG(WARNING) << "gc binlog task is not valid";
-            return;
-        }
-        if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
-            LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not 
valid";
-            return;
-        }
+void PublishVersionWorkerPool::publish_version_callback(const 
TAgentTaskRequest& req) {

Review Comment:
   warning: method 'publish_version_callback' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/agent/task_worker_pool.h:65:
   ```diff
   -     void publish_version_callback(const TAgentTaskRequest& task);
   +     static void publish_version_callback(const TAgentTaskRequest& task);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to