This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a974625c [INLONG-9320][SDK] Support local disaster recovery manager 
configuration (#9321)
08a974625c is described below

commit 08a974625cd025559002a1e27b6c4d9ce5f54e45
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Nov 23 10:31:12 2023 +0800

    [INLONG-9320][SDK] Support local disaster recovery manager configuration 
(#9321)
---
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h       |  1 +
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |  9 ++
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          |  1 +
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 98 ++++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  5 ++
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |  4 +
 6 files changed, 118 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 9df35d85a8..aa451caee9 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -90,6 +90,7 @@ private:
   uint64_t tcp_idle_time_;          // The time when tcpclient did not send 
data
   uint32_t tcp_detection_interval_; // tcp-client detection interval
   bool enable_balance_;
+  bool enable_local_cache_;
 
   // auth settings
   bool need_auth_;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index c2bf211b69..70cd2362e2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -115,6 +115,7 @@ void SdkConfig::defaultInit() {
   max_proxy_num_ = constants::kMaxProxyNum;
   enable_isolation_ = constants::kEnableIsolation;
   reserve_proxy_num_ = constants::kReserveProxyNum;
+  enable_local_cache_ = constants::kEnableLocalCache;
 
   local_ip_ = constants::kSerIP;
   local_port_ = constants::kSerPort;
@@ -359,6 +360,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
   } else {
     enable_isolation_ = constants::kEnableIsolation;
   }
+
+  // enable local cache
+  if (doc.HasMember("enable_local_cache") && 
doc["enable_local_cache"].IsBool()) {
+    const rapidjson::Value &obj = doc["enable_local_cache"];
+    enable_local_cache_ = obj.GetBool();
+  } else {
+    enable_local_cache_ = constants::kEnableLocalCache;
+  }
 }
 
 void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index f058142841..79706a511e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -104,6 +104,7 @@ int32_t ApiImp::DoInit() {
   LOG_INFO("inlong dataproxy cpp sdk Init complete!");
 
   ProxyManager::GetInstance()->Init();
+  ProxyManager::GetInstance()->ReadLocalCache();
 
   for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) 
{
     LOG_INFO("DoInit CheckConf inlong_group_id:"
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 084905e2c9..1c5f440119 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -18,6 +18,7 @@
  */
 
 #include "proxy_manager.h"
+#include "../config/ini_help.h"
 #include "../utils/capi_constant.h"
 #include "../utils/logger.h"
 #include "../utils/utils.h"
@@ -107,6 +108,23 @@ void ProxyManager::DoUpdate() {
           break;
         } // request success
       }
+
+      if (ret != SdkCode::kSuccess) {
+        if (groupid_2_proxy_map_.find(groupid2cluster.first) != 
groupid_2_proxy_map_.end()) {
+          LOG_WARN("failed to request from manager, use previous " << 
groupid2cluster.first);
+          continue;
+        }
+        if (!SdkConfig::getInstance()->enable_local_cache_) {
+          LOG_WARN("failed to request from manager, forbid local cache!");
+          continue;
+        }
+        meta_data = RecoverFromLocalCache(groupid2cluster.first);
+        if (meta_data.empty()) {
+          LOG_WARN("local cache is empty!");
+          continue;
+        }
+      }
+
       ProxyInfoVec proxyInfoVec;
       ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
       if (ret != SdkCode::kSuccess) {
@@ -117,6 +135,7 @@ void ProxyManager::DoUpdate() {
       if (!proxyInfoVec.empty()) {
         unique_write_lock<read_write_mutex> 
wtlck(groupid_2_proxy_map_rwmutex_);
         groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
+        cache_proxy_info_[groupid2cluster.first] = meta_data;
         LOG_INFO("groupid:" << groupid2cluster.first << " success update "
                             << proxyInfoVec.size() << " proxy-ip.");
       }
@@ -127,6 +146,10 @@ void ProxyManager::DoUpdate() {
 
   UpdateClusterId2ProxyMap();
 
+  if (SdkConfig::getInstance()->enable_local_cache_) {
+    WriteLocalCache();
+  }
+
   update_mutex_.unlock();
   LOG_INFO("finish ProxyManager DoUpdate.");
 }
@@ -343,4 +366,79 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() {
                                                  << it.second);
   }
 }
+
+void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, 
const std::string &groupid,
+                                     const std::string &meta_data) {
+  file << "[groupid" << groupid_index << "]" << std::endl;
+  file << "groupid=" << groupid << std::endl;
+  file << "proxy_cfg=" << meta_data << std::endl;
+}
+
+void ProxyManager::ReadLocalCache() {
+  try {
+    IniFile ini = IniFile();
+    if (ini.load(constants::kCacheFile)) {
+      LOG_INFO("there is no bus list cache file");
+      return;
+    }
+    int32_t groupid_count = 0;
+    if (ini.getInt("main", "groupid_count", &groupid_count)) {
+      LOG_WARN("failed to parse .proxy list.ini file");
+      return;
+    }
+    for (int32_t i = 0; i < groupid_count; i++) {
+      std::string groupid_list = "groupid" + std::to_string(i);
+      std::string groupid, proxy;
+      if (ini.getString(groupid_list, "groupid", &groupid)) {
+        LOG_WARN("failed to get from cache file." << groupid);
+        continue;
+      }
+      if (ini.getString(groupid_list, "proxy_cfg", &proxy)) {
+        LOG_WARN("failed to get cache proxy list" << groupid);
+        continue;
+      }
+      LOG_INFO("read cache file, id:" << groupid << ", local config:" << 
proxy);
+      cache_proxy_info_[groupid] = proxy;
+    }
+  }catch (...){
+    LOG_ERROR("ReadLocalCache error!");
+  }
+}
+
+void ProxyManager::WriteLocalCache() {
+  int32_t groupid_count = 0;
+  try {
+    std::ofstream outfile;
+    outfile.open(constants::kCacheTmpFile, std::ios::out | std::ios::trunc);
+
+    for (auto &it : cache_proxy_info_) {
+      BuildLocalCache(outfile, groupid_count, it.first, it.second);
+      groupid_count++;
+    }
+    if (outfile) {
+      if (groupid_count) {
+        outfile << "[main]" << std::endl;
+        outfile << "groupid_count=" << groupid_count << std::endl;
+      }
+      outfile.close();
+    }
+    if (groupid_count) {
+      rename(constants::kCacheTmpFile, constants::kCacheFile);
+    }
+  } catch (...) {
+    LOG_ERROR("WriteLocalCache error!");
+  }
+  LOG_INFO("WriteLocalCache bid number:" << groupid_count);
+}
+
+std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) {
+  std::string meta_data;
+  auto it = cache_proxy_info_.find(groupid);
+  if (it != cache_proxy_info_.end()) {
+    meta_data = it->second;
+  }
+  LOG_INFO("RecoverFromLocalCache:" << groupid << ",local cache:" << 
meta_data);
+  return meta_data;
+}
+
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 653185798c..6e1c58ef39 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -50,6 +50,7 @@ private:
   bool exit_flag_;
   std::thread update_conf_thread_;
   volatile bool inited_ = false;
+  std::unordered_map<std::string, std::string> cache_proxy_info_;
 
   int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
                       ProxyInfoVec &proxy_info_vec);
@@ -72,6 +73,10 @@ public:
   bool CheckClusterId(const std::string &cluster_id);
   void UpdateClusterId2ProxyMap();
   void UpdateGroupid2ClusterIdMap();
+  void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const 
std::string &groupid, const std::string &meta_data);
+  void ReadLocalCache();
+  void WriteLocalCache();
+  std::string RecoverFromLocalCache(const std::string&groupid);
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index eb17f56313..fe25ee7861 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -46,6 +46,7 @@ static const int32_t kDispatchIntervalSend = 10;
 static const int32_t kLoadBalanceInterval = 300000;
 static const int32_t kHeartBeatInterval = 60000;
 static const bool kEnableBalance = true;
+static const bool kEnableLocalCache = true;
 
 static const bool kEnablePack = true;
 static const uint32_t kPackSize = 409600;
@@ -101,6 +102,9 @@ static const int32_t kWeight[30] = {1,  1,  1,  1,  1,  2,  
2,  2,   2,   2,
                                     3,  3,  3,  3,  3,  6,  6,  6,   6,   6,
                                     12, 12, 12, 12, 12, 48, 96, 192, 384, 
1000};
 
+static const char kCacheFile[] = ".proxy_list.ini";
+static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
+
 } // namespace constants
 } // namespace inlong
 #endif // INLONG_SDK_CONSTANT_H
\ No newline at end of file

Reply via email to