This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 912dce3b2e [INLONG-9378][SDK] Optimize proxy configuration update 
(#9381)
912dce3b2e is described below

commit 912dce3b2ed886b28b4531d739d8774a3c698c56
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Fri Dec 1 18:37:13 2023 +0800

    [INLONG-9378][SDK] Optimize proxy configuration update (#9381)
---
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 197 +++++++++++++--------
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  15 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   5 +-
 3 files changed, 140 insertions(+), 77 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 8c2dfc4c1e..e3771db99e 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -27,6 +27,7 @@
 #include <rapidjson/document.h>
 
 namespace inlong {
+const uint64_t MINUTE = 60000;
 ProxyManager *ProxyManager::instance_ = new ProxyManager();
 ProxyManager::~ProxyManager() {
   if (update_conf_thread_.joinable()) {
@@ -41,6 +42,7 @@ ProxyManager::~ProxyManager() {
 }
 void ProxyManager::Init() {
   timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
+  last_update_time_ = Utils::getCurrentMsTime();
   if (__sync_bool_compare_and_swap(&inited_, false, true)) {
     update_conf_thread_ = std::thread(&ProxyManager::Update, this);
   }
@@ -65,8 +67,11 @@ void ProxyManager::Update() {
   LOG_INFO("proxylist DoUpdate thread exit");
 }
 void ProxyManager::DoUpdate() {
-  update_mutex_.try_lock();
   LOG_INFO("start ProxyManager DoUpdate.");
+  if (!update_mutex_.try_lock()) {
+    LOG_INFO("DoUpdate try_lock. " << getpid());
+    return;
+  }
 
   std::srand(unsigned(std::time(nullptr)));
 
@@ -76,80 +81,30 @@ void ProxyManager::DoUpdate() {
     return;
   }
 
-  {
-    unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
-    for (auto &groupid2cluster : groupid_2_cluster_id_map_) {
-      std::string url;
-      if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
-        url = SdkConfig::getInstance()->manager_cluster_url_;
-      else {
-        url = SdkConfig::getInstance()->manager_url_ + "/" +
-              groupid2cluster.first;
-      }
-      std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
-                              "&version=" + constants::kVersion +
-                              "&protocolType=" + constants::kProtocolType;
-      LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
-                                      << "proxy cfg url " << url.c_str()
-                                      << "post_data:" << post_data.c_str());
-
-      std::string meta_data;
-      int32_t ret;
-      std::string urlByDNS;
-      for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
-        HttpRequest request = {url,
-                               timeout_,
-                               SdkConfig::getInstance()->need_auth_,
-                               SdkConfig::getInstance()->auth_id_,
-                               SdkConfig::getInstance()->auth_key_,
-                               post_data};
-        ret = Utils::requestUrl(meta_data, &request);
-        if (!ret) {
-          break;
-        } // request success
-      }
-
-      if (ret != SdkCode::kSuccess) {
-        if (groupid_2_proxy_map_.find(groupid2cluster.first) != 
groupid_2_proxy_map_.end()) {
-          LOG_WARN("failed to request from manager, use previous " << 
groupid2cluster.first);
-          continue;
-        }
-        if (!SdkConfig::getInstance()->enable_local_cache_) {
-          LOG_WARN("failed to request from manager, forbid local cache!");
-          continue;
-        }
-        meta_data = RecoverFromLocalCache(groupid2cluster.first);
-        if (meta_data.empty()) {
-          LOG_WARN("local cache is empty!");
-          continue;
-        }
-      }
+  int retry = constants::MAX_RETRY;
+  do {
+    std::unordered_map<std::string, std::string> bid_2_cluster_id =
+        BuildGroupId2ClusterId();
 
-      ProxyInfoVec proxyInfoVec;
-      ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
-      if (ret != SdkCode::kSuccess) {
-        LOG_ERROR("failed to parse groupid:%s json proxy list "
-                  << groupid2cluster.first.c_str());
-        continue;
-      }
-      if (!proxyInfoVec.empty()) {
-        unique_write_lock<read_write_mutex> 
wtlck(groupid_2_proxy_map_rwmutex_);
-        groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
-        cache_proxy_info_[groupid2cluster.first] = meta_data;
-        LOG_INFO("groupid:" << groupid2cluster.first << " success update "
-                            << proxyInfoVec.size() << " proxy-ip.");
-      }
-    }
-  }
+    UpdateProxy(bid_2_cluster_id);
 
-  UpdateGroupid2ClusterIdMap();
+    UpdateGroupid2ClusterIdMap();
 
-  UpdateClusterId2ProxyMap();
+    UpdateClusterId2ProxyMap();
+    uint64_t id_count = GetGroupIdCount();
+    if (bid_2_cluster_id.size() == id_count) {
+      break;
+    }
+    LOG_INFO("retry DoUpdate!. update size:" << bid_2_cluster_id.size()
+                                             << " != latest size:" << 
id_count);
+  } while (retry--);
 
   if (SdkConfig::getInstance()->enable_local_cache_) {
     WriteLocalCache();
   }
 
+  last_update_time_ = Utils::getCurrentMsTime();
+
   update_mutex_.unlock();
   LOG_INFO("finish ProxyManager DoUpdate.");
 }
@@ -254,7 +209,8 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
 
 int32_t ProxyManager::GetProxy(const std::string &key,
                                ProxyInfoVec &proxy_info_vec) {
-  if (constants::IsolationLevel::kLevelOne == 
SdkConfig::getInstance()->isolation_level_) {
+  if (constants::IsolationLevel::kLevelOne ==
+      SdkConfig::getInstance()->isolation_level_) {
     return GetProxyByGroupid(key, proxy_info_vec);
   }
   return GetProxyByClusterId(key, proxy_info_vec);
@@ -360,8 +316,9 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() {
   }
 }
 
-void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, 
const std::string &groupid,
-                                     const std::string &meta_data) {
+void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index,
+                                   const std::string &groupid,
+                                   const std::string &meta_data) {
   file << "[groupid" << groupid_index << "]" << std::endl;
   file << "groupid=" << groupid << std::endl;
   file << "proxy_cfg=" << meta_data << std::endl;
@@ -393,7 +350,7 @@ void ProxyManager::ReadLocalCache() {
       LOG_INFO("read cache file, id:" << groupid << ", local config:" << 
proxy);
       cache_proxy_info_[groupid] = proxy;
     }
-  }catch (...){
+  } catch (...) {
     LOG_ERROR("ReadLocalCache error!");
   }
 }
@@ -441,4 +398,102 @@ std::string ProxyManager::GetClusterID(const std::string 
&groupid) {
   }
   return it->second;
 }
+
+void ProxyManager::UpdateProxy(
+    std::unordered_map<std::string, std::string> &group_id_2_cluster_id) {
+  for (auto &groupid2cluster : group_id_2_cluster_id) {
+    if (SkipUpdate(groupid2cluster.first)) {
+      LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
+      continue;
+    }
+    std::string url;
+    if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
+      url = SdkConfig::getInstance()->manager_cluster_url_;
+    else {
+      url =
+          SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
+    }
+    std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
+                            "&version=" + constants::kVersion +
+                            "&protocolType=" + constants::kProtocolType;
+    LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
+                                    << "proxy cfg url " << url.c_str()
+                                    << "post_data:" << post_data.c_str());
+
+    std::string meta_data;
+    int32_t ret;
+    std::string urlByDNS;
+    for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
+      HttpRequest request = {url,
+                             timeout_,
+                             SdkConfig::getInstance()->need_auth_,
+                             SdkConfig::getInstance()->auth_id_,
+                             SdkConfig::getInstance()->auth_key_,
+                             post_data};
+      ret = Utils::requestUrl(meta_data, &request);
+      if (!ret) {
+        break;
+      } // request success
+    }
+
+    if (ret != SdkCode::kSuccess) {
+      if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
+          groupid_2_proxy_map_.end()) {
+        LOG_WARN("failed to request from manager, use previous "
+                 << groupid2cluster.first);
+        continue;
+      }
+      if (!SdkConfig::getInstance()->enable_local_cache_) {
+        LOG_WARN("failed to request from manager, forbid local cache!");
+        continue;
+      }
+      meta_data = RecoverFromLocalCache(groupid2cluster.first);
+      if (meta_data.empty()) {
+        LOG_WARN("local cache is empty!");
+        continue;
+      }
+    }
+
+    ProxyInfoVec proxyInfoVec;
+    ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
+    if (ret != SdkCode::kSuccess) {
+      LOG_ERROR("failed to parse groupid:%s json proxy list "
+                << groupid2cluster.first.c_str());
+      continue;
+    }
+    if (!proxyInfoVec.empty()) {
+      unique_write_lock<read_write_mutex> wtlck(groupid_2_proxy_map_rwmutex_);
+      groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
+      cache_proxy_info_[groupid2cluster.first] = meta_data;
+      LOG_INFO("groupid:" << groupid2cluster.first << " success update "
+                          << proxyInfoVec.size() << " proxy-ip.");
+    }
+  }
+}
+std::unordered_map<std::string, std::string>
+ProxyManager::BuildGroupId2ClusterId() {
+  std::unordered_map<std::string, std::string> bid_2_cluster_id_map_tmp;
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+  for (auto &bid2cluster : groupid_2_cluster_id_map_) {
+    bid_2_cluster_id_map_tmp.insert(bid2cluster);
+  }
+  return bid_2_cluster_id_map_tmp;
+}
+
+uint64_t ProxyManager::GetGroupIdCount() {
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+  return groupid_2_cluster_id_map_.size();
+}
+
+bool ProxyManager::SkipUpdate(const std::string &group_id) {
+  uint64_t current_time = Utils::getCurrentMsTime();
+  uint64_t diff = current_time - last_update_time_;
+  uint64_t threshold =
+      SdkConfig::getInstance()->manager_update_interval_ * MINUTE;
+  bool ret = CheckGroupid(group_id);
+  if (diff < threshold && ret) {
+    return true;
+  }
+  return false;
+}
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 8ae859869f..f07333c989 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -51,6 +51,7 @@ private:
   std::thread update_conf_thread_;
   volatile bool inited_ = false;
   std::unordered_map<std::string, std::string> cache_proxy_info_;
+  uint64_t last_update_time_;
 
   int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
                       ProxyInfoVec &proxy_info_vec);
@@ -64,7 +65,8 @@ public:
   void DoUpdate();
   void Init();
   int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec);
-  int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec 
&proxy_info_vec);
+  int32_t GetProxyByGroupid(const std::string &inlong_group_id,
+                            ProxyInfoVec &proxy_info_vec);
   int32_t GetProxyByClusterId(const std::string &cluster_id,
                               ProxyInfoVec &proxy_info_vec);
   std::string GetGroupKey(const std::string &groupid);
@@ -73,11 +75,18 @@ public:
   bool CheckClusterId(const std::string &cluster_id);
   void UpdateClusterId2ProxyMap();
   void UpdateGroupid2ClusterIdMap();
-  void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const 
std::string &groupid, const std::string &meta_data);
+  void BuildLocalCache(std::ofstream &file, int32_t groupid_index,
+                       const std::string &groupid,
+                       const std::string &meta_data);
   void ReadLocalCache();
   void WriteLocalCache();
-  std::string RecoverFromLocalCache(const std::string&groupid);
+  std::string RecoverFromLocalCache(const std::string &groupid);
   std::string GetClusterID(const std::string &groupid);
+  void UpdateProxy(
+      std::unordered_map<std::string, std::string> &group_id_2_cluster_id);
+  std::unordered_map<std::string, std::string> BuildGroupId2ClusterId();
+  uint64_t GetGroupIdCount();
+  bool SkipUpdate(const std::string &group_id);
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index eaa2821263..1dbebd03db 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -25,9 +25,7 @@
 
 namespace inlong {
 namespace constants {
-enum IsolationLevel{
-  kLevelOne =1, kLevelSecond =2, kLevelThird =3
-};
+enum IsolationLevel { kLevelOne = 1, kLevelSecond = 2, kLevelThird = 3 };
 static const int32_t kMaxRequestTDMTimes = 4;
 static const char kAttrFormat[] =
     "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";
@@ -107,6 +105,7 @@ static const int32_t kWeight[30] = {1,  1,  1,  1,  1,  2,  
2,  2,   2,   2,
 
 static const char kCacheFile[] = ".proxy_list.ini";
 static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
+const int MAX_RETRY = 10;
 
 } // namespace constants
 } // namespace inlong

Reply via email to