chaoyli closed pull request #448: Move load tablets to StorageEngine URL: https://github.com/apache/incubator-doris/pull/448
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a28e71ba..f1c55fe3 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1819,11 +1819,11 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) continue; } #endif - vector<RootPathInfo> root_paths_info; - worker_pool_this->_env->olap_engine()->get_all_root_path_info(&root_paths_info); + vector<DataDirInfo> data_dir_infos; + worker_pool_this->_env->olap_engine()->get_all_data_dir_info(&data_dir_infos); map<string, TDisk> disks; - for (auto root_path_info : root_paths_info) { + for (auto root_path_info : data_dir_infos) { TDisk disk; disk.__set_root_path(root_path_info.path); disk.__set_path_hash(root_path_info.path_hash); diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 48540893..7ef19358 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -437,6 +437,13 @@ OLAPStatus DataDir::deregister_tablet(Tablet* tablet) { return OLAP_SUCCESS; } +void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) { + for (auto& tablet : _tablet_set) { + tablet_infos->push_back(tablet); + } + _tablet_set.clear(); +} + std::string DataDir::get_absolute_shard_path(const std::string& shard_string) { return _path + DATA_PREFIX + "/" + shard_string; } @@ -477,81 +484,4 @@ std::string DataDir::get_root_path_from_schema_hash_path_in_trash( return schema_hash_path_in_trash.parent_path().parent_path().parent_path().parent_path().string(); } -OLAPStatus DataDir::_load_tablet_from_header(StorageEngine* engine, TTabletId tablet_id, - TSchemaHash schema_hash, const std::string& header) { - std::unique_ptr<TabletMeta> tablet_meta(new TabletMeta()); - bool parsed = tablet_meta->ParseFromString(header); - if (!parsed) { - LOG(WARNING) << "parse header string failed for tablet_id:" << tablet_id << " schema_hash:" << schema_hash; - return OLAP_ERR_HEADER_PB_PARSE_FAILED; - } - - // init must be called - OLAPStatus res = tablet_meta->init(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init header, tablet_id:" << tablet_id << ", schema_hash:" << schema_hash; - res = TabletMetaManager::remove(this, tablet_id, schema_hash); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "remove header failed. tablet_id:" << tablet_id - << "schema_hash:" << schema_hash - << "store path:" << path(); - } - return OLAP_ERR_HEADER_INIT_FAILED; - } - TabletSharedPtr tablet = - Tablet::create_from_header(tablet_meta.release(), this); - if (tablet == nullptr) { - LOG(WARNING) << "fail to new tablet. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash; - return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; - } - - if (tablet->lastest_version() == nullptr && !tablet->is_schema_changing()) { - LOG(WARNING) << "tablet not in schema change state without delta is invalid." - << "tablet=" << tablet->full_name(); - // tablet state is invalid, drop tablet - tablet->mark_dropped(); - return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; - } - - res = engine->add_tablet(tablet_id, schema_hash, tablet); - if (res != OLAP_SUCCESS) { - // insert existed tablet return OLAP_SUCCESS - if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) { - LOG(WARNING) << "add duplicate tablet. tablet=" << tablet->full_name(); - } - - LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name(); - return res; - } - res = tablet->register_tablet_into_dir(); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to register tablet into root path. root_path=" << tablet->storage_root_path_name(); - - if (engine->drop_tablet(tablet_id, schema_hash) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to drop tablet when create tablet failed. " - <<"tablet=" << tablet_id << " schema_hash=" << schema_hash; - } - - return res; - } - // load pending data (for realtime push), will add transaction relationship into engine - tablet->load_pending_data(); - - return OLAP_SUCCESS; -} - -OLAPStatus DataDir::load_tablets(StorageEngine* engine) { - auto load_tablet_func = [this, engine](long tablet_id, - long schema_hash, const std::string& value) -> bool { - OLAPStatus status = _load_tablet_from_header(engine, tablet_id, schema_hash, value); - if (status != OLAP_SUCCESS) { - LOG(WARNING) << "load tablet from header failed. status:" << status - << "tablet=" << tablet_id << "." << schema_hash; - }; - return true; - }; - OLAPStatus status = TabletMetaManager::traverse_headers(_meta, load_tablet_func); - return status; -} - } // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index cfd4fe78..f7c4d651 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -29,8 +29,6 @@ namespace doris { -class StorageEngine; - // A DataDir used to manange data in same path. // Now, After DataDir was created, it will never be deleted for easy implementation. class DataDir { @@ -45,8 +43,8 @@ class DataDir { bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } int32_t cluster_id() const { return _cluster_id; } - RootPathInfo to_root_path_info() { - RootPathInfo info; + DataDirInfo get_dir_info() { + DataDirInfo info; info.path = _path; info.path_hash = _path_hash; info.is_used = _is_used; @@ -72,6 +70,7 @@ class DataDir { OLAPStatus register_tablet(Tablet* tablet); OLAPStatus deregister_tablet(Tablet* tablet); + void clear_tablets(std::vector<TabletInfo>* tablet_infos); std::string get_absolute_tablet_path(TabletMeta* header, bool with_schema_hash); @@ -81,8 +80,6 @@ class DataDir { static std::string get_root_path_from_schema_hash_path_in_trash(const std::string& schema_hash_dir_in_trash); - OLAPStatus load_tablets(StorageEngine* engine); - private: std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; } Status _init_cluster_id(); @@ -96,12 +93,7 @@ class DataDir { Status _read_cluster_id(const std::string& path, int32_t* cluster_id); Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id); - OLAPStatus _load_tablet_from_header(StorageEngine* engine, TTabletId tablet_id, - TSchemaHash schema_hash, const std::string& header); - private: - friend class StorageEngine; - std::string _path; int64_t _path_hash; int32_t _cluster_id; @@ -128,4 +120,4 @@ class DataDir { OlapMeta* _meta; }; -} +} // namespace doris diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 2b1e1cac..5d45adcd 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -44,8 +44,8 @@ enum CompactionType { CUMULATIVE_COMPACTION = 2 }; -struct RootPathInfo { - RootPathInfo(): +struct DataDirInfo { + DataDirInfo(): capacity(1), available(0), data_used_capacity(0), diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp index 4bdce393..af752d56 100644 --- a/be/src/olap/olap_snapshot.cpp +++ b/be/src/olap/olap_snapshot.cpp @@ -747,7 +747,7 @@ OLAPStatus StorageEngine::storage_medium_migrate( res = OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; break; } - res = add_tablet(tablet_id, schema_hash, tablet); + res = _tablet_mgr.add_tablet(tablet_id, schema_hash, tablet, false); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", res); break; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 61c49c79..fbe69ee6 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1673,10 +1673,11 @@ OLAPStatus SchemaChangeHandler::_create_new_tablet( } // 3. Add tablet to StorageEngine will make it visiable to user - res = StorageEngine::get_instance()->add_tablet( + res = StorageEngine::get_instance()->get_tablet_mgr()->add_tablet( request.tablet_id, request.tablet_schema.schema_hash, - new_tablet); + new_tablet, + false); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to add tablet to StorageEngine. [res=%d tablet='%s']", res, new_tablet->full_name().c_str()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 7e41024c..263d4f7a 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -123,91 +123,49 @@ StorageEngine::~StorageEngine() { clear(); } -OLAPStatus StorageEngine::_load_store(DataDir* store) { - std::string store_path = store->path(); - LOG(INFO) <<"start to load tablets from store_path:" << store_path; +OLAPStatus StorageEngine::_load_data_dir(DataDir* data_dir) { + std::string data_dir_path = data_dir->path(); + LOG(INFO) <<"start to load tablets from data_dir_path:" << data_dir_path; bool is_header_converted = false; - OLAPStatus res = TabletMetaManager::get_header_converted(store, is_header_converted); + OLAPStatus res = TabletMetaManager::get_header_converted(data_dir, is_header_converted); if (res != OLAP_SUCCESS) { LOG(WARNING) << "get convert flag from meta failed"; return res; } if (is_header_converted) { LOG(INFO) << "load header from meta"; - OLAPStatus s = store->load_tablets(this); + auto load_tablet_func = [this, data_dir](long tablet_id, + long schema_hash, const std::string& value) -> bool { + OLAPStatus status = this->_tablet_mgr.load_tablet_from_header(data_dir, tablet_id, schema_hash, value); + if (status != OLAP_SUCCESS) { + LOG(WARNING) << "load tablet from header failed. status:" << status + << "tablet=" << tablet_id << "." << schema_hash; + }; + return true; + }; + OLAPStatus s = TabletMetaManager::traverse_headers(data_dir->get_meta(), load_tablet_func); LOG(INFO) << "load header from meta finished"; if (s != OLAP_SUCCESS) { - LOG(WARNING) << "there is failure when loading tablet headers, path:" << store_path; + LOG(WARNING) << "there is failure when loading tablet headers, path:" << data_dir_path; return s; } else { return OLAP_SUCCESS; } - } - - // compatible for old header load method - // walk all directory to load header file - LOG(INFO) << "load headers from header files"; - - // get all shards - set<string> shards; - if (dir_walk(store_path + DATA_PREFIX, &shards, NULL) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to walk dir. [root=" << store_path << "]"; + } else { + // ygl: could not be compatable with old doris data. User has to use previous version to parse + // header file into meta env first. + LOG(WARNING) << "header is not converted to tablet meta yet, could not use this Doris version. " + << "[dir path =" << data_dir_path << "]"; return OLAP_ERR_INIT_FAILED; } - - for (const auto& shard : shards) { - // get all tablets - set<string> tablets; - string one_shard_path = store_path + DATA_PREFIX + '/' + shard; - if (dir_walk(one_shard_path, &tablets, NULL) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to walk dir. [root=" << one_shard_path << "]"; - continue; - } - - for (const auto& tablet : tablets) { - // 遍历table目录寻找此table的所有indexedRollupTable,注意不是SegmentGroup,而是Tablet - set<string> schema_hashes; - string one_tablet_path = one_shard_path + '/' + tablet; - if (dir_walk(one_tablet_path, &schema_hashes, NULL) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to walk dir. [root=" << one_tablet_path << "]"; - continue; - } - - for (const auto& schema_hash : schema_hashes) { - TTabletId tablet_id = strtoul(tablet.c_str(), NULL, 10); - TSchemaHash tablet_schema_hash = strtoul(schema_hash.c_str(), NULL, 10); - - // 遍历schema_hash目录寻找此index的所有schema - // 加载失败依然加载下一个Table - if (load_one_tablet( - store, - tablet_id, - tablet_schema_hash, - one_tablet_path + '/' + schema_hash) != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to load one tablet, but continue. [path='%s']", - (one_tablet_path + '/' + schema_hash).c_str()); - } - } - } - } - res = TabletMetaManager::set_converted_flag(store); - LOG(INFO) << "load header from header files finished"; - return res; } -OLAPStatus StorageEngine::load_one_tablet( - DataDir* store, TTabletId tablet_id, SchemaHash schema_hash, - const string& schema_hash_path, bool force) { - return _tablet_mgr.load_one_tablet(store, tablet_id, schema_hash, - schema_hash_path, force); -} - -void StorageEngine::load_stores(const std::vector<DataDir*>& stores) { +void StorageEngine::load_data_dirs(const std::vector<DataDir*>& stores) { std::vector<std::thread> threads; for (auto store : stores) { threads.emplace_back([this, store] { - auto res = _load_store(store); + auto res = _load_data_dir(store); if (res != OLAP_SUCCESS) { LOG(WARNING) << "io error when init load tables. res=" << res << ", store=" << store->path(); @@ -264,7 +222,7 @@ OLAPStatus StorageEngine::open() { _max_base_compaction_task_per_disk = (base_compaction_num_threads + file_system_num - 1) / file_system_num; auto stores = get_stores(); - load_stores(stores); + load_data_dirs(stores); // 取消未完成的SchemaChange任务 _tablet_mgr.cancel_unfinished_schema_change(); @@ -352,22 +310,22 @@ std::vector<DataDir*> StorageEngine::get_stores() { template std::vector<DataDir*> StorageEngine::get_stores<false>(); template std::vector<DataDir*> StorageEngine::get_stores<true>(); -OLAPStatus StorageEngine::get_all_root_path_info(vector<RootPathInfo>* root_paths_info) { +OLAPStatus StorageEngine::get_all_data_dir_info(vector<DataDirInfo>* data_dir_infos) { OLAPStatus res = OLAP_SUCCESS; - root_paths_info->clear(); + data_dir_infos->clear(); MonotonicStopWatch timer; timer.start(); int tablet_counter = 0; // get all root path info and construct a path map. - // path -> RootPathInfo - std::map<std::string, RootPathInfo> path_map; + // path -> DataDirInfo + std::map<std::string, DataDirInfo> path_map; { std::lock_guard<std::mutex> l(_store_lock); for (auto& it : _store_map) { std::string path = it.first; - path_map.emplace(path, it.second->to_root_path_info()); + path_map.emplace(path, it.second->get_dir_info()); // if this path is not used, init it's info if (!path_map[path].is_used) { path_map[path].capacity = 1; @@ -381,13 +339,13 @@ OLAPStatus StorageEngine::get_all_root_path_info(vector<RootPathInfo>* root_path // which the tablet belongs to. _tablet_mgr.update_root_path_info(&path_map, &tablet_counter); - // add path info to root_paths_info + // add path info to data_dir_infos for (auto& entry : path_map) { - root_paths_info->emplace_back(entry.second); + data_dir_infos->emplace_back(entry.second); } // get available capacity of each path - for (auto& info: *root_paths_info) { + for (auto& info: *data_dir_infos) { if (info.is_used) { _get_path_available_capacity(info.path, &info.available); } @@ -511,10 +469,7 @@ void StorageEngine::_delete_tables_on_unused_root_path() { if (it.second->is_used()) { continue; } - for (auto& tablet : it.second->_tablet_set) { - tablet_info_vec.push_back(tablet); - } - it.second->_tablet_set.clear(); + it.second->clear_tablets(&tablet_info_vec); } if (_used_disk_not_enough(unused_root_path_num, total_root_path_num)) { @@ -573,12 +528,6 @@ bool StorageEngine::check_tablet_id_exist(TTabletId tablet_id) { return _tablet_mgr.check_tablet_id_exist(tablet_id); } -OLAPStatus StorageEngine::add_tablet(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool force) { - - return _tablet_mgr.add_tablet(tablet_id, schema_hash, tablet, force); -} - OLAPStatus StorageEngine::add_transaction( TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id) { @@ -966,8 +915,8 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { const uint32_t snapshot_expire = config::snapshot_expire_time_sec; const uint32_t trash_expire = config::trash_file_expire_time_sec; const double guard_space = config::disk_capacity_insufficient_percentage / 100.0; - std::vector<RootPathInfo> root_paths_info; - res = get_all_root_path_info(&root_paths_info); + std::vector<DataDirInfo> data_dir_infos; + res = get_all_data_dir_info(&data_dir_infos); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to get root path stat info when sweep trash."); return res; @@ -981,7 +930,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { } const time_t local_now = mktime(&local_tm_now); //得到当地日历时间 - for (RootPathInfo& info : root_paths_info) { + for (DataDirInfo& info : data_dir_infos) { if (!info.is_used) { continue; } @@ -1428,10 +1377,10 @@ OLAPStatus StorageEngine::finish_clone(TabletSharedPtr tablet, const string& clo } if (is_incremental_clone) { - res = StorageEngine::get_instance()->clone_incremental_data( + res = clone_incremental_data( tablet, clone_header, committed_version); } else { - res = StorageEngine::get_instance()->clone_full_data(tablet, clone_header); + res = clone_full_data(tablet, clone_header); } // if full clone success, need to update cumulative layer point @@ -1470,7 +1419,7 @@ OLAPStatus StorageEngine::obtain_shard_path( return OLAP_ERR_CE_CMD_PARAMS_ERROR; } - auto stores = StorageEngine::get_instance()->get_stores_for_create_tablet(storage_medium); + auto stores = get_stores_for_create_tablet(storage_medium); if (stores.empty()) { OLAP_LOG_WARNING("no available disk can be used to create tablet."); return OLAP_ERR_NO_AVAILABLE_ROOT_PATH; @@ -1506,7 +1455,7 @@ OLAPStatus StorageEngine::load_header( try { auto store_path = boost::filesystem::path(shard_path).parent_path().parent_path().string(); - store = StorageEngine::get_instance()->get_store(store_path); + store = get_store(store_path); if (store == nullptr) { LOG(WARNING) << "invalid shard path, path=" << shard_path; return OLAP_ERR_INVALID_ROOT_PATH; @@ -1521,10 +1470,10 @@ OLAPStatus StorageEngine::load_header( schema_hash_path_stream << shard_path << "/" << request.tablet_id << "/" << request.schema_hash; - res = StorageEngine::get_instance()->load_one_tablet( + res = _tablet_mgr.load_one_tablet( store, request.tablet_id, request.schema_hash, - schema_hash_path_stream.str()); + schema_hash_path_stream.str(), false); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to process load headers. [res=%d]", res); return res; @@ -1547,10 +1496,11 @@ OLAPStatus StorageEngine::load_header( schema_hash_path_stream << shard_path << "/" << tablet_id << "/" << schema_hash; - res = StorageEngine::get_instance()->load_one_tablet( + res = _tablet_mgr.load_one_tablet( store, tablet_id, schema_hash, - schema_hash_path_stream.str()); + schema_hash_path_stream.str(), + false); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to process load headers. [res=%d]", res); return res; @@ -1639,7 +1589,7 @@ OLAPStatus StorageEngine::push( return OLAP_ERR_CE_CMD_PARAMS_ERROR; } - TabletSharedPtr tablet = StorageEngine::get_instance()->get_tablet( + TabletSharedPtr tablet = get_tablet( request.tablet_id, request.schema_hash); if (NULL == tablet.get()) { OLAP_LOG_WARNING("false to find tablet. [tablet=%ld schema_hash=%d]", diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 9d83adc0..9ced675a 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -89,15 +89,6 @@ class StorageEngine { const bool is_schema_change_tablet, const TabletSharedPtr ref_tablet); - // Add a tablet pointer to StorageEngine - // If force, drop the existing tablet add this new one - // - // Return OLAP_SUCCESS, if run ok - // OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication - // OLAP_ERR_NOT_INITED, if not inited - OLAPStatus add_tablet(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool force = false); - OLAPStatus add_transaction(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, const PUniqueId& load_id); @@ -160,13 +151,7 @@ class StorageEngine { // Note: 这里只能reload原先已经存在的root path,即re-load启动时就登记的root path // 是允许的,但re-load全新的path是不允许的,因为此处没有彻底更新ce调度器信息 - void load_stores(const std::vector<DataDir*>& stores); - - OLAPStatus load_one_tablet(DataDir* store, - TTabletId tablet_id, - SchemaHash schema_hash, - const std::string& schema_hash_path, - bool force = false); + void load_data_dirs(const std::vector<DataDir*>& stores); Cache* index_stream_lru_cache() { return _index_stream_lru_cache; @@ -183,7 +168,7 @@ class StorageEngine { void set_store_used_flag(const std::string& root_path, bool is_used); // @brief 获取所有root_path信息 - OLAPStatus get_all_root_path_info(std::vector<RootPathInfo>* root_paths_info); + OLAPStatus get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos); void get_all_available_root_path(std::vector<std::string>* available_paths); @@ -335,6 +320,8 @@ class StorageEngine { } } + TabletManager* get_tablet_mgr() { return &_tablet_mgr; } + private: OLAPStatus check_all_root_path_cluster_id(); @@ -419,6 +406,34 @@ class StorageEngine { OLAPStatus _start_bg_worker(); + // 扫描目录, 加载表 + OLAPStatus _load_data_dir(DataDir* store); + + TabletSharedPtr _find_best_tablet_to_compaction(CompactionType compaction_type); + + OLAPStatus _do_sweep( + const std::string& scan_root, const time_t& local_tm_now, const uint32_t expire); + + // Thread functions + + // base compaction thread process function + void* _base_compaction_thread_callback(void* arg); + + // garbage sweep thread process function. clear snapshot and trash folder + void* _garbage_sweeper_thread_callback(void* arg); + + // delete tablet with io error process function + void* _disk_stat_monitor_thread_callback(void* arg); + + // unused index process function + void* _unused_index_thread_callback(void* arg); + + // cumulative process function + void* _cumulative_compaction_thread_callback(void* arg); + + // clean file descriptors cache + void* _fd_cache_clean_callback(void* arg); + private: struct CompactionCandidate { @@ -451,14 +466,6 @@ class StorageEngine { typedef std::map<std::string, uint32_t> file_system_task_count_t; - // 扫描目录, 加载表 - OLAPStatus _load_store(DataDir* store); - - TabletSharedPtr _find_best_tablet_to_compaction(CompactionType compaction_type); - - OLAPStatus _do_sweep( - const std::string& scan_root, const time_t& local_tm_now, const uint32_t expire); - EngineOptions _options; std::mutex _store_lock; std::map<std::string, DataDir*> _store_map; @@ -488,26 +495,6 @@ class StorageEngine { std::unordered_map<SegmentGroup*, std::vector<std::string>> _gc_files; Mutex _gc_mutex; - // Thread functions - - // base compaction thread process function - void* _base_compaction_thread_callback(void* arg); - - // garbage sweep thread process function. clear snapshot and trash folder - void* _garbage_sweeper_thread_callback(void* arg); - - // delete tablet with io error process function - void* _disk_stat_monitor_thread_callback(void* arg); - - // unused index process function - void* _unused_index_thread_callback(void* arg); - - // cumulative process function - void* _cumulative_compaction_thread_callback(void* arg); - - // clean file descriptors cache - void* _fd_cache_clean_callback(void* arg); - // thread to monitor snapshot expiry std::thread _garbage_sweeper_thread; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 7708da1e..1ae53793 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -344,7 +344,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, // 4. Add tablet to StorageEngine will make it visiable to user res = add_tablet( - request.tablet_id, request.tablet_schema.schema_hash, tablet); + request.tablet_id, request.tablet_schema.schema_hash, tablet, false); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", res); break; @@ -688,6 +688,69 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com return best_tablet; } +OLAPStatus TabletManager::load_tablet_from_header(DataDir* data_dir, TTabletId tablet_id, + TSchemaHash schema_hash, const std::string& header) { + std::unique_ptr<TabletMeta> tablet_meta(new TabletMeta()); + bool parsed = tablet_meta->ParseFromString(header); + if (!parsed) { + LOG(WARNING) << "parse header string failed for tablet_id:" << tablet_id << " schema_hash:" << schema_hash; + return OLAP_ERR_HEADER_PB_PARSE_FAILED; + } + + // init must be called + OLAPStatus res = tablet_meta->init(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init header, tablet_id:" << tablet_id << ", schema_hash:" << schema_hash; + res = TabletMetaManager::remove(data_dir, tablet_id, schema_hash); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "remove header failed. tablet_id:" << tablet_id + << "schema_hash:" << schema_hash + << "store path:" << path(); + } + return OLAP_ERR_HEADER_INIT_FAILED; + } + TabletSharedPtr tablet = + Tablet::create_from_header(tablet_meta.release(), data_dir); + if (tablet == nullptr) { + LOG(WARNING) << "fail to new tablet. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash; + return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; + } + + if (tablet->lastest_version() == nullptr && !tablet->is_schema_changing()) { + LOG(WARNING) << "tablet not in schema change state without delta is invalid." + << "tablet=" << tablet->full_name(); + // tablet state is invalid, drop tablet + tablet->mark_dropped(); + return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR; + } + + res = add_tablet(tablet_id, schema_hash, tablet, false); + if (res != OLAP_SUCCESS) { + // insert existed tablet return OLAP_SUCCESS + if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) { + LOG(WARNING) << "add duplicate tablet. tablet=" << tablet->full_name(); + } + + LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name(); + return res; + } + res = tablet->register_tablet_into_dir(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to register tablet into root path. root_path=" << tablet->storage_root_path_name(); + + if (drop_tablet(tablet_id, schema_hash, false) != OLAP_SUCCESS) { + LOG(WARNING) << "fail to drop tablet when create tablet failed. " + <<"tablet=" << tablet_id << " schema_hash=" << schema_hash; + } + + return res; + } + // load pending data (for realtime push), will add transaction relationship into engine + tablet->load_pending_data(); + + return OLAP_SUCCESS; +} // load_tablet_from_header + OLAPStatus TabletManager::load_one_tablet( DataDir* store, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path, bool force) { @@ -889,7 +952,7 @@ bool TabletManager::try_schema_change_lock(TTabletId tablet_id) { return res; } // try_schema_change_lock -void TabletManager::update_root_path_info(std::map<std::string, RootPathInfo>* path_map, +void TabletManager::update_root_path_info(std::map<std::string, DataDirInfo>* path_map, int* tablet_counter) { _tablet_map_lock.rdlock(); for (auto& entry : _tablet_map) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 4c5775d3..09ca34e7 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -65,7 +65,7 @@ class TabletManager { // OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication // OLAP_ERR_NOT_INITED, if not inited OLAPStatus add_tablet(TTabletId tablet_id, SchemaHash schema_hash, - const TabletSharedPtr& tablet, bool force = false); + const TabletSharedPtr& tablet, bool force); void cancel_unfinished_schema_change(); @@ -129,7 +129,11 @@ class TabletManager { void get_tablet_stat(TTabletStatResult& result); - OLAPStatus load_one_tablet(DataDir* store, + // parse tablet header msg to generate tablet object + OLAPStatus load_tablet_from_header(DataDir* data_dir, TTabletId tablet_id, + TSchemaHash schema_hash, const std::string& header); + + OLAPStatus load_one_tablet(DataDir* data_dir, TTabletId tablet_id, SchemaHash schema_hash, const std::string& schema_hash_path, @@ -149,7 +153,7 @@ class TabletManager { // Prevent schema change executed concurrently. bool try_schema_change_lock(TTabletId tablet_id); - void update_root_path_info(std::map<std::string, RootPathInfo>* path_map, int* tablet_counter); + void update_root_path_info(std::map<std::string, DataDirInfo>* path_map, int* tablet_counter); void update_storage_medium_type_count(uint32_t storage_medium_type_count); diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 41af825b..191da176 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -667,7 +667,7 @@ Status SnapshotLoader::move( LOG(WARNING) << ss.str(); return Status(ss.str()); } - OLAPStatus ost = StorageEngine::get_instance()->load_one_tablet( + OLAPStatus ost = StorageEngine::get_instance()->get_tablet_mgr()->load_one_tablet( store, tablet_id, schema_hash, tablet_path, true); if (ost != OLAP_SUCCESS) { std::stringstream ss; diff --git a/be/test/agent/task_worker_pool_test.cpp b/be/test/agent/task_worker_pool_test.cpp index b61f9baf..9cf694da 100644 --- a/be/test/agent/task_worker_pool_test.cpp +++ b/be/test/agent/task_worker_pool_test.cpp @@ -1629,7 +1629,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { // Get root path failed, report failed #if 0 - EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) + EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR)); EXPECT_CALL(mock_master_server_client, report(_, _)) @@ -1638,7 +1638,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { #endif // Get root path success, report failed - EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) + EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_SUCCESS)); EXPECT_CALL(mock_master_server_client, report(_, _)) @@ -1647,7 +1647,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool); // Get root path success, report success - EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) + EXPECT_CALL(mock_command_executor, get_all_data_dir_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_SUCCESS)); EXPECT_CALL(mock_master_server_client, report(_, _)) diff --git a/be/test/olap/mock_command_executor.h b/be/test/olap/mock_command_executor.h index 2cf8c552..e2f40213 100644 --- a/be/test/olap/mock_command_executor.h +++ b/be/test/olap/mock_command_executor.h @@ -86,8 +86,8 @@ class MockCommandExecutor : public StorageEngine { MOCK_METHOD1(reload_root_path, OLAPStatus(const std::string& root_paths)); MOCK_METHOD2(check_tablet_exist, bool(TTabletId tablet_id, TSchemaHash schema_hash)); MOCK_METHOD1( - get_all_root_path_info, - OLAPStatus(std::vector<RootPathInfo>* root_paths_info)); + get_all_data_dir_info, + OLAPStatus(std::vector<DataDirInfo>* data_dir_infos)); MOCK_METHOD2( publish_version, OLAPStatus(const TPublishVersionRequest& request, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org