This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ad34b4a024 [INLONG-9213][SDK] Support isolation by inlong groupid 
(#9218)
ad34b4a024 is described below

commit ad34b4a024a29938343fc622a18e23b38dbb7f0c
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Sun Nov 5 13:58:39 2023 +0800

    [INLONG-9213][SDK] Support isolation by inlong groupid (#9218)
---
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h       |   1 +
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |  11 ++-
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 108 +++++++++++++++++----
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  22 ++++-
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  |  42 ++++----
 .../dataproxy-sdk-cpp/src/manager/send_manager.h   |  11 +--
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   1 +
 7 files changed, 148 insertions(+), 48 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index deb11f99ed..4407869ba7 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -79,6 +79,7 @@ public:
   uint32_t manager_url_timeout_;     // URL parsing timeout, seconds
   uint32_t max_proxy_num_;
   uint32_t msg_type_;
+  bool enable_isolation_;
 
   // Network parameters
   bool enable_tcp_nagle_;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 340b6f1e66..3620d8985e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -84,7 +84,7 @@ void SdkConfig::defaultInit() {
   recv_buf_size_ = constants::kRecvBufSize;
   max_msg_size_ = constants::kExtPackSize;
   max_group_id_num_ = constants::kMaxGroupIdNum;
-  max_stream_id_num_=constants::kMaxStreamIdNum;
+  max_stream_id_num_ = constants::kMaxStreamIdNum;
 
   // Packaging parameters
   enable_pack_ = constants::kEnablePack;
@@ -106,6 +106,7 @@ void SdkConfig::defaultInit() {
   manager_update_interval_ = constants::kManagerUpdateInterval;
   manager_url_timeout_ = constants::kManagerTimeout;
   max_proxy_num_ = constants::kMaxProxyNum;
+  enable_isolation_ = constants::kEnableIsolation;
 
   local_ip_ = constants::kSerIP;
   local_port_ = constants::kSerPort;
@@ -325,6 +326,13 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
     std::string inlong_group_ids_str = obj.GetString();
     Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ",");
   }
+  // enable isolation
+  if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) {
+    const rapidjson::Value &obj = doc["enable_isolation"];
+    enable_isolation_ = obj.GetBool();
+  } else {
+    enable_isolation_ = constants::kEnableIsolation;
+  }
 }
 
 void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
@@ -460,6 +468,7 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("auth_key: " << auth_key_.c_str());
   LOG_INFO("max_group_id_num: " << max_group_id_num_);
   LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
+  LOG_INFO("enable_isolation: " << enable_isolation_);
 }
 
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 5b5d13055a..abe33dde21 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -69,15 +69,15 @@ void ProxyManager::DoUpdate() {
 
   std::srand(unsigned(std::time(nullptr)));
 
-  if (groupid_2_cluster_map_.empty()) {
+  if (groupid_2_cluster_id_map_.empty()) {
     LOG_INFO("empty groupid, no need to DoUpdate proxy list");
     update_mutex_.unlock();
     return;
   }
 
   {
-    unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_rwmutex_);
-    for (auto &groupid2cluster : groupid_2_cluster_map_) {
+    unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+    for (auto &groupid2cluster : groupid_2_cluster_id_map_) {
       std::string url;
       if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
         url = SdkConfig::getInstance()->manager_cluster_url_;
@@ -122,6 +122,11 @@ void ProxyManager::DoUpdate() {
       }
     }
   }
+
+  UpdateGroupid2ClusterIdMap();
+
+  UpdateClusterId2ProxyMap();
+
   update_mutex_.unlock();
   LOG_INFO("finish ProxyManager DoUpdate.");
 }
@@ -174,7 +179,8 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
               << inlong_group_id.c_str());
     return SdkCode::kErrorParseJson;
   }
-
+  groupid_2_cluster_id_update_map_[inlong_group_id] =
+      clusterInfo["clusterId"].GetInt();
   // proxy list
   for (auto &proxy : nodeList.GetArray()) {
     std::string ip;
@@ -212,31 +218,28 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
   return SdkCode::kSuccess;
 }
 
-int32_t ProxyManager::GetProxy(const std::string &groupid,
+int32_t ProxyManager::GetProxy(const std::string &key,
                                ProxyInfoVec &proxy_info_vec) {
-  unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
-  auto it = groupid_2_proxy_map_.find(groupid);
-  if (it == groupid_2_proxy_map_.end()) {
-    LOG_ERROR("GetProxyByGroupid  failed . Groupid " << groupid);
-    return SdkCode::kFailGetProxyConf;
+  if (SdkConfig::getInstance()->enable_isolation_) {
+    return GetProxyByGroupid(key, proxy_info_vec);
+  } else {
+    return GetProxyByClusterId(key, proxy_info_vec);
   }
-  proxy_info_vec = it->second;
-  return SdkCode::kSuccess;
 }
 
 int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
                                    bool is_inited) {
   {
-    unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_rwmutex_);
-    auto it = groupid_2_cluster_map_.find(inlong_group_id);
-    if (it != groupid_2_cluster_map_.end()) {
+    unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+    auto it = groupid_2_cluster_id_map_.find(inlong_group_id);
+    if (it != groupid_2_cluster_id_map_.end()) {
       return SdkCode::kSuccess;
     }
   }
 
   {
-    unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_rwmutex_);
-    groupid_2_cluster_map_.emplace(inlong_group_id, -1);
+    unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_id_rwmutex_);
+    groupid_2_cluster_id_map_.emplace(inlong_group_id, "");
   }
 
   LOG_INFO("CheckProxyConf groupid:" << inlong_group_id
@@ -252,12 +255,81 @@ int32_t ProxyManager::CheckBidConf(const std::string 
&inlong_group_id,
   return SdkCode::kSuccess;
 }
 
-bool ProxyManager::IsExist(const std::string &inlong_group_id) {
+bool ProxyManager::HasProxy(const std::string &inlong_group_id) {
+  if (SdkConfig::getInstance()->enable_isolation_) {
+    return CheckGroupid(inlong_group_id);
+  } else {
+    return CheckClusterId(inlong_group_id);
+  }
+}
+int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id,
+                                        ProxyInfoVec &proxy_info_vec) {
   unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
   auto it = groupid_2_proxy_map_.find(inlong_group_id);
+  if (it == groupid_2_proxy_map_.end()) {
+    LOG_ERROR("GetProxy failed! inlong_group_id: " << inlong_group_id);
+    return SdkCode::kFailGetConn;
+  }
+  proxy_info_vec = it->second;
+  return SdkCode::kSuccess;
+}
+int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id,
+                                          ProxyInfoVec &proxy_info_vec) {
+  unique_read_lock<read_write_mutex> rdlck(clusterid_2_proxy_map_rwmutex_);
+  auto it = cluster_id_2_proxy_map_.find(cluster_id);
+  if (it == cluster_id_2_proxy_map_.end()) {
+    LOG_ERROR("GetProxy failed! cluster_id:" << cluster_id);
+    return SdkCode::kFailGetConn;
+  }
+  proxy_info_vec = it->second;
+  return SdkCode::kSuccess;
+}
+std::string ProxyManager::GetSendGroupKey(const std::string &groupid) {
+  if (SdkConfig::getInstance()->enable_isolation_) {
+    return groupid;
+  }
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+  auto it = groupid_2_cluster_id_map_.find(groupid);
+  if (it == groupid_2_cluster_id_map_.end()) {
+    return "";
+  }
+  return it->second;
+}
+bool ProxyManager::CheckGroupid(const std::string &groupid) {
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
+  auto it = groupid_2_proxy_map_.find(groupid);
   if (it == groupid_2_proxy_map_.end()) {
     return false;
   }
   return true;
 }
+bool ProxyManager::CheckClusterId(const std::string &cluster_id) {
+  unique_read_lock<read_write_mutex> rdlck(clusterid_2_proxy_map_rwmutex_);
+  auto it = cluster_id_2_proxy_map_.find(cluster_id);
+  if (it == cluster_id_2_proxy_map_.end()) {
+    return false;
+  }
+  return true;
+}
+void ProxyManager::UpdateClusterId2ProxyMap() {
+  if (SdkConfig::getInstance()->enable_isolation_) {
+    return;
+  }
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+  for (const auto &it : groupid_2_cluster_id_update_map_) {
+    ProxyInfoVec proxy_info_vec;
+    if (GetProxyByGroupid(it.first, proxy_info_vec) == SdkCode::kSuccess) {
+      unique_write_lock<read_write_mutex> 
wtlck(clusterid_2_proxy_map_rwmutex_);
+      cluster_id_2_proxy_map_[std::to_string(it.second)] = proxy_info_vec;
+    }
+  }
+}
+void ProxyManager::UpdateGroupid2ClusterIdMap() {
+  unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_id_rwmutex_);
+  for (const auto &it : groupid_2_cluster_id_update_map_) {
+    groupid_2_cluster_id_map_[it.first] = std::to_string(it.second);
+    LOG_INFO("UpdateGroup2ClusterIdMap groupid:" << it.first << " ,cluster id:"
+                                                 << it.second);
+  }
+}
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index b3f46bc68d..18856f61ec 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -31,11 +31,17 @@ class ProxyManager {
 private:
   static ProxyManager *instance_;
   uint32_t timeout_;
-  read_write_mutex groupid_2_cluster_rwmutex_;
+  read_write_mutex groupid_2_cluster_id_rwmutex_;
   read_write_mutex groupid_2_proxy_map_rwmutex_;
+  read_write_mutex clusterid_2_proxy_map_rwmutex_;
+
+  std::unordered_map<std::string, std::string> groupid_2_cluster_id_map_;
+  std::unordered_map<std::string, int32_t> groupid_2_cluster_id_update_map_;
 
-  std::unordered_map<std::string, int32_t> groupid_2_cluster_map_;
   std::unordered_map<std::string, ProxyInfoVec> groupid_2_proxy_map_;
+  std::unordered_map<std::string, ProxyInfoVec>
+      cluster_id_2_proxy_map_; //<cluster_id,busList>
+
   bool update_flag_;
   std::mutex cond_mutex_;
   std::mutex update_mutex_;
@@ -45,7 +51,7 @@ private:
   std::thread update_conf_thread_;
   volatile bool inited_ = false;
 
-  int32_t ParseAndGet(const std::string &groupid, const std::string &meta_data,
+  int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
                       ProxyInfoVec &proxy_info_vec);
 
 public:
@@ -57,7 +63,15 @@ public:
   void DoUpdate();
   void Init();
   int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec);
-  bool IsExist(const std::string &inlong_group_id);
+  int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec 
&proxy_info_vec);
+  int32_t GetProxyByClusterId(const std::string &cluster_id,
+                              ProxyInfoVec &proxy_info_vec);
+  std::string GetSendGroupKey(const std::string &groupid);
+  bool HasProxy(const std::string &inlong_group_id);
+  bool CheckGroupid(const std::string &groupid);
+  bool CheckClusterId(const std::string &cluster_id);
+  void UpdateClusterId2ProxyMap();
+  void UpdateGroupid2ClusterIdMap();
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 24b74c2bf2..475fe14b3f 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -28,48 +28,52 @@ SendManager::SendManager() : send_group_idx_(0) {
              << SdkConfig::getInstance()->inlong_group_ids_[i]
              << " send group num:"
              << SdkConfig::getInstance()->per_groupid_thread_nums_);
-    DoAddSendGroup(SdkConfig::getInstance()->inlong_group_ids_[i]);
+    std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey(
+        SdkConfig::getInstance()->inlong_group_ids_[i]);
+    AddSendGroup(send_group_key);
   }
 }
 
 SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
-  SendGroupPtr send_group_ptr = DoGetSendGroup(group_id);
+  std::string send_group_key =
+      ProxyManager::GetInstance()->GetSendGroupKey(group_id);
+  SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key);
   if (send_group_ptr == nullptr) {
-    AddSendGroup(group_id);
+    AddSendGroup(send_group_key);
   }
   return send_group_ptr;
 }
 
-bool SendManager::AddSendGroup(const std::string &inlong_group_id) {
-  if (!ProxyManager::GetInstance()->IsExist(inlong_group_id)) {
-    LOG_ERROR("inlong_group_id is not exist." << inlong_group_id);
+bool SendManager::AddSendGroup(const std::string &send_group_key) {
+  if (!ProxyManager::GetInstance()->HasProxy(send_group_key)) {
+    LOG_ERROR("inlong_group_id is not exist." << send_group_key);
     return false;
   }
-  DoAddSendGroup(inlong_group_id);
+  DoAddSendGroup(send_group_key);
   return false;
 }
 
-void SendManager::DoAddSendGroup(const std::string &group_id) {
-  unique_write_lock<read_write_mutex> 
wtlck(group_id_2_send_group_map_rwmutex_);
-  auto send_group_map = group_id_2_send_group_map_.find(group_id);
-  if (send_group_map != group_id_2_send_group_map_.end()) {
-    LOG_WARN("send group has exist." << group_id);
+void SendManager::DoAddSendGroup(const std::string &send_group_key) {
+  unique_write_lock<read_write_mutex> wtlck(send_group_map_rwmutex_);
+  auto send_group_map = send_group_map_.find(send_group_key);
+  if (send_group_map != send_group_map_.end()) {
+    LOG_WARN("send group has exist." << send_group_key);
     return;
   }
   std::vector<SendGroupPtr> send_group;
   send_group.reserve(SdkConfig::getInstance()->per_groupid_thread_nums_);
   for (int32_t j = 0; j < SdkConfig::getInstance()->per_groupid_thread_nums_;
        j++) {
-    send_group.push_back(std::make_shared<SendGroup>(group_id));
+    send_group.push_back(std::make_shared<SendGroup>(send_group_key));
   }
-  group_id_2_send_group_map_[group_id] = send_group;
+  send_group_map_[send_group_key] = send_group;
 }
 
-SendGroupPtr SendManager::DoGetSendGroup(const std::string &group_id) {
-  unique_read_lock<read_write_mutex> rdlck(group_id_2_send_group_map_rwmutex_);
-  auto send_group_map = group_id_2_send_group_map_.find(group_id);
-  if (send_group_map == group_id_2_send_group_map_.end()) {
-    LOG_ERROR("fail to get send group, group_id:%s" << group_id);
+SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) {
+  unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_);
+  auto send_group_map = send_group_map_.find(send_group_key);
+  if (send_group_map == send_group_map_.end()) {
+    LOG_ERROR("fail to get send group, group_id:%s" << send_group_key);
     return nullptr;
   }
   if (send_group_map->second.empty()) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
index 6ee55d4fb4..fa627647f2 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
@@ -29,18 +29,17 @@ using namespace inlong;
 
 class SendManager : noncopyable {
 private:
-  read_write_mutex group_id_2_send_group_map_rwmutex_;
-  std::unordered_map<std::string, std::vector<SendGroupPtr>>
-      group_id_2_send_group_map_;
-  SendGroupPtr DoGetSendGroup(const std::string &group_id);
-  void DoAddSendGroup(const std::string &group_id);
+  read_write_mutex send_group_map_rwmutex_;
+  std::unordered_map<std::string, std::vector<SendGroupPtr>> send_group_map_;
+  SendGroupPtr DoGetSendGroup(const std::string &send_group_key);
+  void DoAddSendGroup(const std::string &send_group_key);
   volatile uint32_t send_group_idx_;
 
 public:
   SendManager();
   virtual ~SendManager(){};
   SendGroupPtr GetSendGroup(const std::string &group_id);
-  bool AddSendGroup(const std::string &inlong_group_id);
+  bool AddSendGroup(const std::string &send_group_key);
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 274c3161e2..b2af191112 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -89,6 +89,7 @@ static const bool kNeedAuth = false;
 
 static const uint32_t kMaxAttrLen = 2048;
 const uint32_t ATTR_LENGTH = 10;
+static const bool kEnableIsolation = false;
 
 } // namespace constants
 } // namespace inlong

Reply via email to