This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 08a974625c [INLONG-9320][SDK] Support local disaster recovery manager configuration (#9321) 08a974625c is described below commit 08a974625cd025559002a1e27b6c4d9ce5f54e45 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Nov 23 10:31:12 2023 +0800 [INLONG-9320][SDK] Support local disaster recovery manager configuration (#9321) --- .../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 1 + .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 9 ++ .../dataproxy-sdk-cpp/src/core/api_imp.cc | 1 + .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 98 ++++++++++++++++++++++ .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 5 ++ .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 4 + 6 files changed, 118 insertions(+) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h index 9df35d85a8..aa451caee9 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h @@ -90,6 +90,7 @@ private: uint64_t tcp_idle_time_; // The time when tcpclient did not send data uint32_t tcp_detection_interval_; // tcp-client detection interval bool enable_balance_; + bool enable_local_cache_; // auth settings bool need_auth_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index c2bf211b69..70cd2362e2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -115,6 +115,7 @@ void SdkConfig::defaultInit() { max_proxy_num_ = constants::kMaxProxyNum; enable_isolation_ = constants::kEnableIsolation; reserve_proxy_num_ = constants::kReserveProxyNum; + enable_local_cache_ = constants::kEnableLocalCache; local_ip_ = constants::kSerIP; local_port_ = constants::kSerPort; @@ -359,6 +360,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { } else { enable_isolation_ = constants::kEnableIsolation; } + + // enable local cache + if (doc.HasMember("enable_local_cache") && doc["enable_local_cache"].IsBool()) { + const rapidjson::Value &obj = doc["enable_local_cache"]; + enable_local_cache_ = obj.GetBool(); + } else { + enable_local_cache_ = constants::kEnableLocalCache; + } } void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index f058142841..79706a511e 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -104,6 +104,7 @@ int32_t ApiImp::DoInit() { LOG_INFO("inlong dataproxy cpp sdk Init complete!"); ProxyManager::GetInstance()->Init(); + ProxyManager::GetInstance()->ReadLocalCache(); for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) { LOG_INFO("DoInit CheckConf inlong_group_id:" diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 084905e2c9..1c5f440119 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -18,6 +18,7 @@ */ #include "proxy_manager.h" +#include "../config/ini_help.h" #include "../utils/capi_constant.h" #include "../utils/logger.h" #include "../utils/utils.h" @@ -107,6 +108,23 @@ void ProxyManager::DoUpdate() { break; } // request success } + + if (ret != SdkCode::kSuccess) { + if (groupid_2_proxy_map_.find(groupid2cluster.first) != groupid_2_proxy_map_.end()) { + LOG_WARN("failed to request from manager, use previous " << groupid2cluster.first); + continue; + } + if (!SdkConfig::getInstance()->enable_local_cache_) { + LOG_WARN("failed to request from manager, forbid local cache!"); + continue; + } + meta_data = RecoverFromLocalCache(groupid2cluster.first); + if (meta_data.empty()) { + LOG_WARN("local cache is empty!"); + continue; + } + } + ProxyInfoVec proxyInfoVec; ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec); if (ret != SdkCode::kSuccess) { @@ -117,6 +135,7 @@ void ProxyManager::DoUpdate() { if (!proxyInfoVec.empty()) { unique_write_lock<read_write_mutex> wtlck(groupid_2_proxy_map_rwmutex_); groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec; + cache_proxy_info_[groupid2cluster.first] = meta_data; LOG_INFO("groupid:" << groupid2cluster.first << " success update " << proxyInfoVec.size() << " proxy-ip."); } @@ -127,6 +146,10 @@ void ProxyManager::DoUpdate() { UpdateClusterId2ProxyMap(); + if (SdkConfig::getInstance()->enable_local_cache_) { + WriteLocalCache(); + } + update_mutex_.unlock(); LOG_INFO("finish ProxyManager DoUpdate."); } @@ -343,4 +366,79 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() { << it.second); } } + +void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid, + const std::string &meta_data) { + file << "[groupid" << groupid_index << "]" << std::endl; + file << "groupid=" << groupid << std::endl; + file << "proxy_cfg=" << meta_data << std::endl; +} + +void ProxyManager::ReadLocalCache() { + try { + IniFile ini = IniFile(); + if (ini.load(constants::kCacheFile)) { + LOG_INFO("there is no bus list cache file"); + return; + } + int32_t groupid_count = 0; + if (ini.getInt("main", "groupid_count", &groupid_count)) { + LOG_WARN("failed to parse .proxy list.ini file"); + return; + } + for (int32_t i = 0; i < groupid_count; i++) { + std::string groupid_list = "groupid" + std::to_string(i); + std::string groupid, proxy; + if (ini.getString(groupid_list, "groupid", &groupid)) { + LOG_WARN("failed to get from cache file." << groupid); + continue; + } + if (ini.getString(groupid_list, "proxy_cfg", &proxy)) { + LOG_WARN("failed to get cache proxy list" << groupid); + continue; + } + LOG_INFO("read cache file, id:" << groupid << ", local config:" << proxy); + cache_proxy_info_[groupid] = proxy; + } + }catch (...){ + LOG_ERROR("ReadLocalCache error!"); + } +} + +void ProxyManager::WriteLocalCache() { + int32_t groupid_count = 0; + try { + std::ofstream outfile; + outfile.open(constants::kCacheTmpFile, std::ios::out | std::ios::trunc); + + for (auto &it : cache_proxy_info_) { + BuildLocalCache(outfile, groupid_count, it.first, it.second); + groupid_count++; + } + if (outfile) { + if (groupid_count) { + outfile << "[main]" << std::endl; + outfile << "groupid_count=" << groupid_count << std::endl; + } + outfile.close(); + } + if (groupid_count) { + rename(constants::kCacheTmpFile, constants::kCacheFile); + } + } catch (...) { + LOG_ERROR("WriteLocalCache error!"); + } + LOG_INFO("WriteLocalCache bid number:" << groupid_count); +} + +std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) { + std::string meta_data; + auto it = cache_proxy_info_.find(groupid); + if (it != cache_proxy_info_.end()) { + meta_data = it->second; + } + LOG_INFO("RecoverFromLocalCache:" << groupid << ",local cache:" << meta_data); + return meta_data; +} + } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 653185798c..6e1c58ef39 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -50,6 +50,7 @@ private: bool exit_flag_; std::thread update_conf_thread_; volatile bool inited_ = false; + std::unordered_map<std::string, std::string> cache_proxy_info_; int32_t ParseAndGet(const std::string &key, const std::string &meta_data, ProxyInfoVec &proxy_info_vec); @@ -72,6 +73,10 @@ public: bool CheckClusterId(const std::string &cluster_id); void UpdateClusterId2ProxyMap(); void UpdateGroupid2ClusterIdMap(); + void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid, const std::string &meta_data); + void ReadLocalCache(); + void WriteLocalCache(); + std::string RecoverFromLocalCache(const std::string&groupid); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index eb17f56313..fe25ee7861 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -46,6 +46,7 @@ static const int32_t kDispatchIntervalSend = 10; static const int32_t kLoadBalanceInterval = 300000; static const int32_t kHeartBeatInterval = 60000; static const bool kEnableBalance = true; +static const bool kEnableLocalCache = true; static const bool kEnablePack = true; static const uint32_t kPackSize = 409600; @@ -101,6 +102,9 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 6, 6, 6, 6, 6, 12, 12, 12, 12, 12, 48, 96, 192, 384, 1000}; +static const char kCacheFile[] = ".proxy_list.ini"; +static const char kCacheTmpFile[] = ".proxy_list.ini.tmp"; + } // namespace constants } // namespace inlong #endif // INLONG_SDK_CONSTANT_H \ No newline at end of file