chaoyli closed pull request #448: Move load tablets to StorageEngine
URL: https://github.com/apache/incubator-doris/pull/448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index a28e71ba..f1c55fe3 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1819,11 +1819,11 @@ void* 
TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
             continue;
         }
 #endif
-        vector<RootPathInfo> root_paths_info;
-        
worker_pool_this->_env->olap_engine()->get_all_root_path_info(&root_paths_info);
+        vector<DataDirInfo> data_dir_infos;
+        
worker_pool_this->_env->olap_engine()->get_all_data_dir_info(&data_dir_infos);
 
         map<string, TDisk> disks;
-        for (auto root_path_info : root_paths_info) {
+        for (auto root_path_info : data_dir_infos) {
             TDisk disk;
             disk.__set_root_path(root_path_info.path);
             disk.__set_path_hash(root_path_info.path_hash);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 48540893..7ef19358 100755
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -437,6 +437,13 @@ OLAPStatus DataDir::deregister_tablet(Tablet* tablet) {
     return OLAP_SUCCESS;
 }
 
+void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) {
+     for (auto& tablet : _tablet_set) {
+        tablet_infos->push_back(tablet);
+    }
+    _tablet_set.clear();
+}
+
 std::string DataDir::get_absolute_shard_path(const std::string& shard_string) {
     return _path + DATA_PREFIX + "/" + shard_string;
 }
@@ -477,81 +484,4 @@ std::string 
DataDir::get_root_path_from_schema_hash_path_in_trash(
     return 
schema_hash_path_in_trash.parent_path().parent_path().parent_path().parent_path().string();
 }
 
-OLAPStatus DataDir::_load_tablet_from_header(StorageEngine* engine, TTabletId 
tablet_id,
-        TSchemaHash schema_hash, const std::string& header) {
-    std::unique_ptr<TabletMeta> tablet_meta(new TabletMeta());
-    bool parsed = tablet_meta->ParseFromString(header);
-    if (!parsed) {
-        LOG(WARNING) << "parse header string failed for tablet_id:" << 
tablet_id << " schema_hash:" << schema_hash;
-        return OLAP_ERR_HEADER_PB_PARSE_FAILED;
-    }
-
-    // init must be called
-    OLAPStatus res = tablet_meta->init();
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to init header, tablet_id:" << tablet_id << ", 
schema_hash:" << schema_hash;
-        res = TabletMetaManager::remove(this, tablet_id, schema_hash);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "remove header failed. tablet_id:" << tablet_id
-                << "schema_hash:" << schema_hash
-                << "store path:" << path();
-        }
-        return OLAP_ERR_HEADER_INIT_FAILED;
-    }
-    TabletSharedPtr tablet =
-        Tablet::create_from_header(tablet_meta.release(), this);
-    if (tablet == nullptr) {
-        LOG(WARNING) << "fail to new tablet. tablet_id=" << tablet_id << ", 
schema_hash:" << schema_hash;
-        return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
-    }
-
-    if (tablet->lastest_version() == nullptr && !tablet->is_schema_changing()) 
{
-        LOG(WARNING) << "tablet not in schema change state without delta is 
invalid."
-                     << "tablet=" << tablet->full_name();
-        // tablet state is invalid, drop tablet
-        tablet->mark_dropped();
-        return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
-    }
-
-    res = engine->add_tablet(tablet_id, schema_hash, tablet);
-    if (res != OLAP_SUCCESS) {
-        // insert existed tablet return OLAP_SUCCESS
-        if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) {
-            LOG(WARNING) << "add duplicate tablet. tablet=" << 
tablet->full_name();
-        }
-
-        LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name();
-        return res;
-    }
-    res = tablet->register_tablet_into_dir();
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to register tablet into root path. root_path=" 
<< tablet->storage_root_path_name();
-
-        if (engine->drop_tablet(tablet_id, schema_hash) != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to drop tablet when create tablet failed. "
-                <<"tablet=" << tablet_id << " schema_hash=" << schema_hash;
-        }
-
-        return res;
-    }
-    // load pending data (for realtime push), will add transaction 
relationship into engine
-    tablet->load_pending_data();
-
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus DataDir::load_tablets(StorageEngine* engine) {
-    auto load_tablet_func = [this, engine](long tablet_id,
-            long schema_hash, const std::string& value) -> bool {
-        OLAPStatus status = _load_tablet_from_header(engine, tablet_id, 
schema_hash, value);
-        if (status != OLAP_SUCCESS) {
-            LOG(WARNING) << "load tablet from header failed. status:" << status
-                << "tablet=" << tablet_id << "." << schema_hash;
-        };
-        return true;
-    };
-    OLAPStatus status = TabletMetaManager::traverse_headers(_meta, 
load_tablet_func);
-    return status;
-}
-
 } // namespace doris
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index cfd4fe78..f7c4d651 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -29,8 +29,6 @@
 
 namespace doris {
 
-class StorageEngine;
-
 // A DataDir used to manange data in same path.
 // Now, After DataDir was created, it will never be deleted for easy 
implementation.
 class DataDir {
@@ -45,8 +43,8 @@ class DataDir {
     bool is_used() const { return _is_used; }
     void set_is_used(bool is_used) { _is_used = is_used; }
     int32_t cluster_id() const { return _cluster_id; }
-    RootPathInfo to_root_path_info() {
-        RootPathInfo info;
+    DataDirInfo get_dir_info() {
+        DataDirInfo info;
         info.path = _path;
         info.path_hash = _path_hash;
         info.is_used = _is_used;
@@ -72,6 +70,7 @@ class DataDir {
 
     OLAPStatus register_tablet(Tablet* tablet);
     OLAPStatus deregister_tablet(Tablet* tablet);
+    void clear_tablets(std::vector<TabletInfo>* tablet_infos);
 
     std::string get_absolute_tablet_path(TabletMeta* header, bool 
with_schema_hash);
 
@@ -81,8 +80,6 @@ class DataDir {
 
     static std::string get_root_path_from_schema_hash_path_in_trash(const 
std::string& schema_hash_dir_in_trash);
 
-    OLAPStatus load_tablets(StorageEngine* engine);
-
 private:
     std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
     Status _init_cluster_id();
@@ -96,12 +93,7 @@ class DataDir {
     Status _read_cluster_id(const std::string& path, int32_t* cluster_id);
     Status _write_cluster_id_to_path(const std::string& path, int32_t 
cluster_id); 
 
-    OLAPStatus _load_tablet_from_header(StorageEngine* engine, TTabletId 
tablet_id,
-                TSchemaHash schema_hash, const std::string& header);
-
 private:
-    friend class StorageEngine;
-    
     std::string _path;
     int64_t _path_hash;
     int32_t _cluster_id;
@@ -128,4 +120,4 @@ class DataDir {
     OlapMeta* _meta;
 };
 
-}
+} // namespace doris
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 2b1e1cac..5d45adcd 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -44,8 +44,8 @@ enum CompactionType {
     CUMULATIVE_COMPACTION = 2
 };
 
-struct RootPathInfo {
-    RootPathInfo():
+struct DataDirInfo {
+    DataDirInfo():
             capacity(1),
             available(0),
             data_used_capacity(0),
diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp
index 4bdce393..af752d56 100644
--- a/be/src/olap/olap_snapshot.cpp
+++ b/be/src/olap/olap_snapshot.cpp
@@ -747,7 +747,7 @@ OLAPStatus StorageEngine::storage_medium_migrate(
             res = OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
             break;
         }
-        res = add_tablet(tablet_id, schema_hash, tablet);
+        res = _tablet_mgr.add_tablet(tablet_id, schema_hash, tablet, false);
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", 
res);
             break;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 61c49c79..fbe69ee6 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1673,10 +1673,11 @@ OLAPStatus SchemaChangeHandler::_create_new_tablet(
         }
 
         // 3. Add tablet to StorageEngine will make it visiable to user
-        res = StorageEngine::get_instance()->add_tablet(
+        res = StorageEngine::get_instance()->get_tablet_mgr()->add_tablet(
                 request.tablet_id,
                 request.tablet_schema.schema_hash,
-                new_tablet);
+                new_tablet, 
+                false);
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("failed to add tablet to StorageEngine. [res=%d 
tablet='%s']",
                              res, new_tablet->full_name().c_str());
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 7e41024c..263d4f7a 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -123,91 +123,49 @@ StorageEngine::~StorageEngine() {
     clear();
 }
 
-OLAPStatus StorageEngine::_load_store(DataDir* store) {
-    std::string store_path = store->path();
-    LOG(INFO) <<"start to load tablets from store_path:" << store_path;
+OLAPStatus StorageEngine::_load_data_dir(DataDir* data_dir) {
+    std::string data_dir_path = data_dir->path();
+    LOG(INFO) <<"start to load tablets from data_dir_path:" << data_dir_path;
 
     bool is_header_converted = false;
-    OLAPStatus res = TabletMetaManager::get_header_converted(store, 
is_header_converted);
+    OLAPStatus res = TabletMetaManager::get_header_converted(data_dir, 
is_header_converted);
     if (res != OLAP_SUCCESS) {
         LOG(WARNING) << "get convert flag from meta failed";
         return res;
     }
     if (is_header_converted) {
         LOG(INFO) << "load header from meta";
-        OLAPStatus s = store->load_tablets(this);
+        auto load_tablet_func = [this, data_dir](long tablet_id,
+            long schema_hash, const std::string& value) -> bool {
+            OLAPStatus status = 
this->_tablet_mgr.load_tablet_from_header(data_dir, tablet_id, schema_hash, 
value);
+            if (status != OLAP_SUCCESS) {
+                LOG(WARNING) << "load tablet from header failed. status:" << 
status
+                    << "tablet=" << tablet_id << "." << schema_hash;
+            };
+            return true;
+        };
+        OLAPStatus s = 
TabletMetaManager::traverse_headers(data_dir->get_meta(), load_tablet_func);
         LOG(INFO) << "load header from meta finished";
         if (s != OLAP_SUCCESS) {
-            LOG(WARNING) << "there is failure when loading tablet headers, 
path:" << store_path;
+            LOG(WARNING) << "there is failure when loading tablet headers, 
path:" << data_dir_path;
             return s;
         } else {
             return OLAP_SUCCESS;
         }
-    }
-
-    // compatible for old header load method
-    // walk all directory to load header file
-    LOG(INFO) << "load headers from header files";
-
-    // get all shards
-    set<string> shards;
-    if (dir_walk(store_path + DATA_PREFIX, &shards, NULL) != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to walk dir. [root=" << store_path << "]";
+    } else {
+        // ygl: could not be compatable with old doris data. User has to use 
previous version to parse
+        // header file into meta env first.
+        LOG(WARNING) << "header is not converted to tablet meta yet, could not 
use this Doris version. " 
+                    << "[dir path =" << data_dir_path << "]";
         return OLAP_ERR_INIT_FAILED;
     }
-
-    for (const auto& shard : shards) {
-        // get all tablets
-        set<string> tablets;
-        string one_shard_path = store_path + DATA_PREFIX +  '/' + shard;
-        if (dir_walk(one_shard_path, &tablets, NULL) != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to walk dir. [root=" << one_shard_path << 
"]";
-            continue;
-        }
-
-        for (const auto& tablet : tablets) {
-            // 遍历table目录寻找此table的所有indexedRollupTable,注意不是SegmentGroup,而是Tablet
-            set<string> schema_hashes;
-            string one_tablet_path = one_shard_path + '/' + tablet;
-            if (dir_walk(one_tablet_path, &schema_hashes, NULL) != 
OLAP_SUCCESS) {
-                LOG(WARNING) << "fail to walk dir. [root=" << one_tablet_path 
<< "]";
-                continue;
-            }
-
-            for (const auto& schema_hash : schema_hashes) {
-                TTabletId tablet_id = strtoul(tablet.c_str(), NULL, 10);
-                TSchemaHash tablet_schema_hash = strtoul(schema_hash.c_str(), 
NULL, 10);
-
-                // 遍历schema_hash目录寻找此index的所有schema
-                // 加载失败依然加载下一个Table
-                if (load_one_tablet(
-                        store,
-                        tablet_id,
-                        tablet_schema_hash,
-                        one_tablet_path + '/' + schema_hash) != OLAP_SUCCESS) {
-                    OLAP_LOG_WARNING("fail to load one tablet, but continue. 
[path='%s']",
-                                     (one_tablet_path + '/' + 
schema_hash).c_str());
-                }
-            }
-        }
-    }
-    res = TabletMetaManager::set_converted_flag(store);
-    LOG(INFO) << "load header from header files finished";
-    return res;
 }
 
-OLAPStatus StorageEngine::load_one_tablet(
-        DataDir* store, TTabletId tablet_id, SchemaHash schema_hash,
-        const string& schema_hash_path, bool force) {
-    return _tablet_mgr.load_one_tablet(store, tablet_id, schema_hash,
-        schema_hash_path, force);
-}
-
-void StorageEngine::load_stores(const std::vector<DataDir*>& stores) {
+void StorageEngine::load_data_dirs(const std::vector<DataDir*>& stores) {
     std::vector<std::thread> threads;
     for (auto store : stores) {
         threads.emplace_back([this, store] {
-            auto res = _load_store(store);
+            auto res = _load_data_dir(store);
             if (res != OLAP_SUCCESS) {
                 LOG(WARNING) << "io error when init load tables. res=" << res
                     << ", store=" << store->path();
@@ -264,7 +222,7 @@ OLAPStatus StorageEngine::open() {
     _max_base_compaction_task_per_disk = (base_compaction_num_threads + 
file_system_num - 1) / file_system_num;
 
     auto stores = get_stores();
-    load_stores(stores);
+    load_data_dirs(stores);
     // 取消未完成的SchemaChange任务
     _tablet_mgr.cancel_unfinished_schema_change();
 
@@ -352,22 +310,22 @@ std::vector<DataDir*> StorageEngine::get_stores() {
 template std::vector<DataDir*> StorageEngine::get_stores<false>();
 template std::vector<DataDir*> StorageEngine::get_stores<true>();
 
-OLAPStatus StorageEngine::get_all_root_path_info(vector<RootPathInfo>* 
root_paths_info) {
+OLAPStatus StorageEngine::get_all_data_dir_info(vector<DataDirInfo>* 
data_dir_infos) {
     OLAPStatus res = OLAP_SUCCESS;
-    root_paths_info->clear();
+    data_dir_infos->clear();
 
     MonotonicStopWatch timer;
     timer.start();
     int tablet_counter = 0;
 
     // get all root path info and construct a path map.
-    // path -> RootPathInfo
-    std::map<std::string, RootPathInfo> path_map;
+    // path -> DataDirInfo
+    std::map<std::string, DataDirInfo> path_map;
     {
         std::lock_guard<std::mutex> l(_store_lock);
         for (auto& it : _store_map) {
             std::string path = it.first;
-            path_map.emplace(path, it.second->to_root_path_info());
+            path_map.emplace(path, it.second->get_dir_info());
             // if this path is not used, init it's info
             if (!path_map[path].is_used) {
                 path_map[path].capacity = 1;
@@ -381,13 +339,13 @@ OLAPStatus 
StorageEngine::get_all_root_path_info(vector<RootPathInfo>* root_path
     // which the tablet belongs to.
     _tablet_mgr.update_root_path_info(&path_map, &tablet_counter);
 
-    // add path info to root_paths_info
+    // add path info to data_dir_infos
     for (auto& entry : path_map) {
-        root_paths_info->emplace_back(entry.second);
+        data_dir_infos->emplace_back(entry.second);
     }
 
     // get available capacity of each path
-    for (auto& info: *root_paths_info) {
+    for (auto& info: *data_dir_infos) {
         if (info.is_used) {
             _get_path_available_capacity(info.path,  &info.available);
         }
@@ -511,10 +469,7 @@ void StorageEngine::_delete_tables_on_unused_root_path() {
         if (it.second->is_used()) {
             continue;
         }
-        for (auto& tablet : it.second->_tablet_set) {
-            tablet_info_vec.push_back(tablet);
-        }
-        it.second->_tablet_set.clear();
+        it.second->clear_tablets(&tablet_info_vec);
     }
 
     if (_used_disk_not_enough(unused_root_path_num, total_root_path_num)) {
@@ -573,12 +528,6 @@ bool StorageEngine::check_tablet_id_exist(TTabletId 
tablet_id) {
     return _tablet_mgr.check_tablet_id_exist(tablet_id);
 }
 
-OLAPStatus StorageEngine::add_tablet(TTabletId tablet_id, SchemaHash 
schema_hash,
-                                 const TabletSharedPtr& tablet, bool force) {
-    
-    return _tablet_mgr.add_tablet(tablet_id, schema_hash, tablet, force);
-}
-
 OLAPStatus StorageEngine::add_transaction(
     TPartitionId partition_id, TTransactionId transaction_id,
     TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) {
@@ -966,8 +915,8 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) {
     const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
     const uint32_t trash_expire = config::trash_file_expire_time_sec;
     const double guard_space = config::disk_capacity_insufficient_percentage / 
100.0;
-    std::vector<RootPathInfo> root_paths_info;
-    res = get_all_root_path_info(&root_paths_info);
+    std::vector<DataDirInfo> data_dir_infos;
+    res = get_all_data_dir_info(&data_dir_infos);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("failed to get root path stat info when sweep 
trash.");
         return res;
@@ -981,7 +930,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) {
     }
     const time_t local_now = mktime(&local_tm_now); //得到当地日历时间
 
-    for (RootPathInfo& info : root_paths_info) {
+    for (DataDirInfo& info : data_dir_infos) {
         if (!info.is_used) {
             continue;
         }
@@ -1428,10 +1377,10 @@ OLAPStatus StorageEngine::finish_clone(TabletSharedPtr 
tablet, const string& clo
         }
 
         if (is_incremental_clone) {
-            res = StorageEngine::get_instance()->clone_incremental_data(
+            res = clone_incremental_data(
                                               tablet, clone_header, 
committed_version);
         } else {
-            res = StorageEngine::get_instance()->clone_full_data(tablet, 
clone_header);
+            res = clone_full_data(tablet, clone_header);
         }
 
         // if full clone success, need to update cumulative layer point
@@ -1470,7 +1419,7 @@ OLAPStatus StorageEngine::obtain_shard_path(
         return OLAP_ERR_CE_CMD_PARAMS_ERROR;
     }
 
-    auto stores = 
StorageEngine::get_instance()->get_stores_for_create_tablet(storage_medium);
+    auto stores = get_stores_for_create_tablet(storage_medium);
     if (stores.empty()) {
         OLAP_LOG_WARNING("no available disk can be used to create tablet.");
         return OLAP_ERR_NO_AVAILABLE_ROOT_PATH;
@@ -1506,7 +1455,7 @@ OLAPStatus StorageEngine::load_header(
         try {
             auto store_path =
                 
boost::filesystem::path(shard_path).parent_path().parent_path().string();
-            store = StorageEngine::get_instance()->get_store(store_path);
+            store = get_store(store_path);
             if (store == nullptr) {
                 LOG(WARNING) << "invalid shard path, path=" << shard_path;
                 return OLAP_ERR_INVALID_ROOT_PATH;
@@ -1521,10 +1470,10 @@ OLAPStatus StorageEngine::load_header(
     schema_hash_path_stream << shard_path
                             << "/" << request.tablet_id
                             << "/" << request.schema_hash;
-    res =  StorageEngine::get_instance()->load_one_tablet(
+    res =  _tablet_mgr.load_one_tablet(
             store,
             request.tablet_id, request.schema_hash,
-            schema_hash_path_stream.str());
+            schema_hash_path_stream.str(), false);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to process load headers. [res=%d]", res);
         return res;
@@ -1547,10 +1496,11 @@ OLAPStatus StorageEngine::load_header(
     schema_hash_path_stream << shard_path
                             << "/" << tablet_id
                             << "/" << schema_hash;
-    res =  StorageEngine::get_instance()->load_one_tablet(
+    res =  _tablet_mgr.load_one_tablet(
             store,
             tablet_id, schema_hash,
-            schema_hash_path_stream.str());
+            schema_hash_path_stream.str(), 
+            false);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to process load headers. [res=%d]", res);
         return res;
@@ -1639,7 +1589,7 @@ OLAPStatus StorageEngine::push(
         return OLAP_ERR_CE_CMD_PARAMS_ERROR;
     }
 
-    TabletSharedPtr tablet = StorageEngine::get_instance()->get_tablet(
+    TabletSharedPtr tablet = get_tablet(
             request.tablet_id, request.schema_hash);
     if (NULL == tablet.get()) {
         OLAP_LOG_WARNING("false to find tablet. [tablet=%ld schema_hash=%d]",
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 9d83adc0..9ced675a 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -89,15 +89,6 @@ class StorageEngine {
                               const bool is_schema_change_tablet,
                               const TabletSharedPtr ref_tablet);
 
-    // Add a tablet pointer to StorageEngine
-    // If force, drop the existing tablet add this new one
-    //
-    // Return OLAP_SUCCESS, if run ok
-    //        OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication
-    //        OLAP_ERR_NOT_INITED, if not inited
-    OLAPStatus add_tablet(TTabletId tablet_id, SchemaHash schema_hash,
-                         const TabletSharedPtr& tablet, bool force = false);
-
     OLAPStatus add_transaction(TPartitionId partition_id, TTransactionId 
transaction_id,
                                TTabletId tablet_id, SchemaHash schema_hash,
                                const PUniqueId& load_id);
@@ -160,13 +151,7 @@ class StorageEngine {
 
     // Note: 这里只能reload原先已经存在的root path,即re-load启动时就登记的root path
     // 是允许的,但re-load全新的path是不允许的,因为此处没有彻底更新ce调度器信息
-    void load_stores(const std::vector<DataDir*>& stores);
-
-    OLAPStatus load_one_tablet(DataDir* store,
-                               TTabletId tablet_id,
-                               SchemaHash schema_hash,
-                               const std::string& schema_hash_path,
-                               bool force = false);
+    void load_data_dirs(const std::vector<DataDir*>& stores);
 
     Cache* index_stream_lru_cache() {
         return _index_stream_lru_cache;
@@ -183,7 +168,7 @@ class StorageEngine {
     void set_store_used_flag(const std::string& root_path, bool is_used);
 
     // @brief 获取所有root_path信息
-    OLAPStatus get_all_root_path_info(std::vector<RootPathInfo>* 
root_paths_info);
+    OLAPStatus get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos);
 
     void get_all_available_root_path(std::vector<std::string>* 
available_paths);
 
@@ -335,6 +320,8 @@ class StorageEngine {
         }
     }
 
+    TabletManager* get_tablet_mgr() { return &_tablet_mgr; }
+
 private:
     OLAPStatus check_all_root_path_cluster_id();
 
@@ -419,6 +406,34 @@ class StorageEngine {
 
     OLAPStatus _start_bg_worker();
 
+    // 扫描目录, 加载表
+    OLAPStatus _load_data_dir(DataDir* store);
+
+    TabletSharedPtr _find_best_tablet_to_compaction(CompactionType 
compaction_type);
+
+    OLAPStatus _do_sweep(
+            const std::string& scan_root, const time_t& local_tm_now, const 
uint32_t expire);
+
+    // Thread functions
+
+    // base compaction thread process function
+    void* _base_compaction_thread_callback(void* arg);
+
+    // garbage sweep thread process function. clear snapshot and trash folder
+    void* _garbage_sweeper_thread_callback(void* arg);
+
+    // delete tablet with io error process function
+    void* _disk_stat_monitor_thread_callback(void* arg);
+
+    // unused index process function
+    void* _unused_index_thread_callback(void* arg);
+
+    // cumulative process function
+    void* _cumulative_compaction_thread_callback(void* arg);
+
+    // clean file descriptors cache
+    void* _fd_cache_clean_callback(void* arg);
+
 private:
 
     struct CompactionCandidate {
@@ -451,14 +466,6 @@ class StorageEngine {
 
     typedef std::map<std::string, uint32_t> file_system_task_count_t;
 
-    // 扫描目录, 加载表
-    OLAPStatus _load_store(DataDir* store);
-
-    TabletSharedPtr _find_best_tablet_to_compaction(CompactionType 
compaction_type);
-
-    OLAPStatus _do_sweep(
-            const std::string& scan_root, const time_t& local_tm_now, const 
uint32_t expire);
-
     EngineOptions _options;
     std::mutex _store_lock;
     std::map<std::string, DataDir*> _store_map;
@@ -488,26 +495,6 @@ class StorageEngine {
     std::unordered_map<SegmentGroup*, std::vector<std::string>> _gc_files;
     Mutex _gc_mutex;
 
-    // Thread functions
-
-    // base compaction thread process function
-    void* _base_compaction_thread_callback(void* arg);
-
-    // garbage sweep thread process function. clear snapshot and trash folder
-    void* _garbage_sweeper_thread_callback(void* arg);
-
-    // delete tablet with io error process function
-    void* _disk_stat_monitor_thread_callback(void* arg);
-
-    // unused index process function
-    void* _unused_index_thread_callback(void* arg);
-
-    // cumulative process function
-    void* _cumulative_compaction_thread_callback(void* arg);
-
-    // clean file descriptors cache
-    void* _fd_cache_clean_callback(void* arg);
-
     // thread to monitor snapshot expiry
     std::thread _garbage_sweeper_thread;
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 7708da1e..1ae53793 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -344,7 +344,7 @@ OLAPStatus TabletManager::create_tablet(const 
TCreateTabletReq& request,
 
         // 4. Add tablet to StorageEngine will make it visiable to user
         res = add_tablet(
-                request.tablet_id, request.tablet_schema.schema_hash, tablet);
+                request.tablet_id, request.tablet_schema.schema_hash, tablet, 
false);
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", 
res);
             break;
@@ -688,6 +688,69 @@ TabletSharedPtr 
TabletManager::find_best_tablet_to_compaction(CompactionType com
     return best_tablet;
 }
 
+OLAPStatus TabletManager::load_tablet_from_header(DataDir* data_dir, TTabletId 
tablet_id,
+        TSchemaHash schema_hash, const std::string& header) {
+    std::unique_ptr<TabletMeta> tablet_meta(new TabletMeta());
+    bool parsed = tablet_meta->ParseFromString(header);
+    if (!parsed) {
+        LOG(WARNING) << "parse header string failed for tablet_id:" << 
tablet_id << " schema_hash:" << schema_hash;
+        return OLAP_ERR_HEADER_PB_PARSE_FAILED;
+    }
+
+    // init must be called
+    OLAPStatus res = tablet_meta->init();
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to init header, tablet_id:" << tablet_id << ", 
schema_hash:" << schema_hash;
+        res = TabletMetaManager::remove(data_dir, tablet_id, schema_hash);
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "remove header failed. tablet_id:" << tablet_id
+                << "schema_hash:" << schema_hash
+                << "store path:" << path();
+        }
+        return OLAP_ERR_HEADER_INIT_FAILED;
+    }
+    TabletSharedPtr tablet =
+        Tablet::create_from_header(tablet_meta.release(), data_dir);
+    if (tablet == nullptr) {
+        LOG(WARNING) << "fail to new tablet. tablet_id=" << tablet_id << ", 
schema_hash:" << schema_hash;
+        return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
+    }
+
+    if (tablet->lastest_version() == nullptr && !tablet->is_schema_changing()) 
{
+        LOG(WARNING) << "tablet not in schema change state without delta is 
invalid."
+                     << "tablet=" << tablet->full_name();
+        // tablet state is invalid, drop tablet
+        tablet->mark_dropped();
+        return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
+    }
+
+    res = add_tablet(tablet_id, schema_hash, tablet, false);
+    if (res != OLAP_SUCCESS) {
+        // insert existed tablet return OLAP_SUCCESS
+        if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) {
+            LOG(WARNING) << "add duplicate tablet. tablet=" << 
tablet->full_name();
+        }
+
+        LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name();
+        return res;
+    }
+    res = tablet->register_tablet_into_dir();
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to register tablet into root path. root_path=" 
<< tablet->storage_root_path_name();
+
+        if (drop_tablet(tablet_id, schema_hash, false) != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to drop tablet when create tablet failed. "
+                <<"tablet=" << tablet_id << " schema_hash=" << schema_hash;
+        }
+
+        return res;
+    }
+    // load pending data (for realtime push), will add transaction 
relationship into engine
+    tablet->load_pending_data();
+
+    return OLAP_SUCCESS;
+} // load_tablet_from_header
+
 OLAPStatus TabletManager::load_one_tablet(
         DataDir* store, TTabletId tablet_id, SchemaHash schema_hash,
         const string& schema_hash_path, bool force) {
@@ -889,7 +952,7 @@ bool TabletManager::try_schema_change_lock(TTabletId 
tablet_id) {
     return res;
 } // try_schema_change_lock
 
-void TabletManager::update_root_path_info(std::map<std::string, RootPathInfo>* 
path_map, 
+void TabletManager::update_root_path_info(std::map<std::string, DataDirInfo>* 
path_map, 
     int* tablet_counter) {
     _tablet_map_lock.rdlock();
     for (auto& entry : _tablet_map) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 4c5775d3..09ca34e7 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -65,7 +65,7 @@ class TabletManager {
     //        OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication
     //        OLAP_ERR_NOT_INITED, if not inited
     OLAPStatus add_tablet(TTabletId tablet_id, SchemaHash schema_hash,
-                         const TabletSharedPtr& tablet, bool force = false);
+                         const TabletSharedPtr& tablet, bool force);
 
     void cancel_unfinished_schema_change();
 
@@ -129,7 +129,11 @@ class TabletManager {
 
     void get_tablet_stat(TTabletStatResult& result);
 
-    OLAPStatus load_one_tablet(DataDir* store,
+    // parse tablet header msg to generate tablet object
+    OLAPStatus load_tablet_from_header(DataDir* data_dir, TTabletId tablet_id,
+                TSchemaHash schema_hash, const std::string& header);
+
+    OLAPStatus load_one_tablet(DataDir* data_dir,
                                TTabletId tablet_id,
                                SchemaHash schema_hash,
                                const std::string& schema_hash_path,
@@ -149,7 +153,7 @@ class TabletManager {
     // Prevent schema change executed concurrently.
     bool try_schema_change_lock(TTabletId tablet_id);
 
-    void update_root_path_info(std::map<std::string, RootPathInfo>* path_map, 
int* tablet_counter);
+    void update_root_path_info(std::map<std::string, DataDirInfo>* path_map, 
int* tablet_counter);
 
     void update_storage_medium_type_count(uint32_t storage_medium_type_count);
 
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 41af825b..191da176 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -667,7 +667,7 @@ Status SnapshotLoader::move(
         LOG(WARNING) << ss.str();
         return Status(ss.str());
     }
-    OLAPStatus ost = StorageEngine::get_instance()->load_one_tablet(
+    OLAPStatus ost = 
StorageEngine::get_instance()->get_tablet_mgr()->load_one_tablet(
             store, tablet_id, schema_hash, tablet_path, true);
     if (ost != OLAP_SUCCESS) {
         std::stringstream ss;
diff --git a/be/test/agent/task_worker_pool_test.cpp 
b/be/test/agent/task_worker_pool_test.cpp
index b61f9baf..9cf694da 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -1629,7 +1629,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) {
 
     // Get root path failed, report failed
 #if 0
-    EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
+    EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_))
             .Times(1)
             .WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
     EXPECT_CALL(mock_master_server_client, report(_, _))
@@ -1638,7 +1638,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) {
 #endif
 
     // Get root path success, report failed
-    EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
+    EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_))
             .Times(1)
             .WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
     EXPECT_CALL(mock_master_server_client, report(_, _))
@@ -1647,7 +1647,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) {
     
task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool);
 
     // Get root path success, report success
-    EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
+    EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_))
             .Times(1)
             .WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
     EXPECT_CALL(mock_master_server_client, report(_, _))
diff --git a/be/test/olap/mock_command_executor.h 
b/be/test/olap/mock_command_executor.h
index 2cf8c552..e2f40213 100644
--- a/be/test/olap/mock_command_executor.h
+++ b/be/test/olap/mock_command_executor.h
@@ -86,8 +86,8 @@ class MockCommandExecutor : public StorageEngine {
     MOCK_METHOD1(reload_root_path, OLAPStatus(const std::string& root_paths));
     MOCK_METHOD2(check_tablet_exist, bool(TTabletId tablet_id, TSchemaHash 
schema_hash));
     MOCK_METHOD1(
-            get_all_root_path_info,
-            OLAPStatus(std::vector<RootPathInfo>* root_paths_info));
+            get_all_data_dir_info,
+            OLAPStatus(std::vector<DataDirInfo>* data_dir_infos));
     MOCK_METHOD2(
             publish_version,
             OLAPStatus(const TPublishVersionRequest& request,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to