This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7178b39a3f [INLONG-9355][SDK] Optimize resource isolation for CPP SDK 
(#9357)
7178b39a3f is described below

commit 7178b39a3fd30d1af9a26d5259f72b727167c519
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Nov 29 18:21:10 2023 +0800

    [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357)
---
 .../dataproxy-sdk-cpp/release/demo/send_demo.cc    |  2 +-
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h       |  3 +-
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       | 19 +++++-----
 .../dataproxy-sdk-cpp/src/group/recv_group.cc      | 29 +++++++++++++---
 .../dataproxy-sdk-cpp/src/group/recv_group.h       |  1 +
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 40 +++++++++++-----------
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  3 +-
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  | 24 ++++++-------
 .../dataproxy-sdk-cpp/src/manager/send_manager.h   |  1 +
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |  3 ++
 10 files changed, 76 insertions(+), 49 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
index a4a783e0dc..b09f8463f1 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
@@ -53,7 +53,7 @@ int main(int argc, char const *argv[]) {
   cout << "---->start sdk successfully" << endl;
 
   int count = 1000;
-  string inlong_group_id = "test_cpp_sdk_20230404";
+  string inlong_group_id = "test_cpp_sdk";
   string inlong_stream_id = "stream1";
   if (4 == argc) {
     inlong_group_id = argv[2];
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 578278c5fd..6d7b23dc21 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -83,7 +83,7 @@ private:
   uint64_t max_proxy_num_;
   uint64_t reserve_proxy_num_;
   uint32_t msg_type_;
-  bool enable_isolation_;
+  uint32_t isolation_level_;
 
   // Network parameters
   bool enable_tcp_nagle_;
@@ -92,6 +92,7 @@ private:
   bool enable_balance_;
   bool enable_local_cache_;
 
+
   // auth settings
   bool need_auth_;
   std::string auth_id_;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index e9d679cd8c..319262adc2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -92,6 +92,7 @@ void SdkConfig::defaultInit() {
   load_balance_interval_ = constants::kLoadBalanceInterval;
   heart_beat_interval_ = constants::kHeartBeatInterval;
   enable_balance_ = constants::kEnableBalance;
+  isolation_level_=constants::IsolationLevel::kLevelSecond;
 
   // cache parameter
   send_buf_size_ = constants::kSendBufSize;
@@ -120,7 +121,6 @@ void SdkConfig::defaultInit() {
   manager_update_interval_ = constants::kManagerUpdateInterval;
   manager_url_timeout_ = constants::kManagerTimeout;
   max_proxy_num_ = constants::kMaxProxyNum;
-  enable_isolation_ = constants::kEnableIsolation;
   reserve_proxy_num_ = constants::kReserveProxyNum;
   enable_local_cache_ = constants::kEnableLocalCache;
 
@@ -360,13 +360,6 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
     std::string inlong_group_ids_str = obj.GetString();
     Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ",");
   }
-  // enable isolation
-  if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) {
-    const rapidjson::Value &obj = doc["enable_isolation"];
-    enable_isolation_ = obj.GetBool();
-  } else {
-    enable_isolation_ = constants::kEnableIsolation;
-  }
 
   // enable local cache
   if (doc.HasMember("enable_local_cache") && 
doc["enable_local_cache"].IsBool()) {
@@ -375,6 +368,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
   } else {
     enable_local_cache_ = constants::kEnableLocalCache;
   }
+
+  // isolation level
+  if (doc.HasMember("isolation_level") && doc["isolation_level"].IsInt() && 
doc["isolation_level"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["isolation_level"];
+    isolation_level_ = obj.GetInt();
+  } else {
+    isolation_level_ = constants::IsolationLevel::kLevelSecond;
+  }
 }
 
 void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
@@ -564,7 +565,7 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("auth_key: " << auth_key_.c_str());
   LOG_INFO("max_group_id_num: " << max_group_id_num_);
   LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
-  LOG_INFO("enable_isolation: " << enable_isolation_);
+  LOG_INFO("isolation_level: " << isolation_level_);
 }
 
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 79931bd434..b852095f68 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -26,11 +26,13 @@
 
 namespace inlong {
 const uint32_t DEFAULT_PACK_ATTR = 400;
+const uint64_t LOG_SAMPLE=100;
 RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> 
send_manager)
     : cur_len_(0), groupId_num_(0), streamId_num_(0),
       msg_type_(SdkConfig::getInstance()->msg_type_),
       data_capacity_(SdkConfig::getInstance()->buf_size_),
-      send_manager_(send_manager),group_key_(group_key) {
+      send_manager_(send_manager),group_key_(group_key),
+      log_stat_(0){
   data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
                             SdkConfig::getInstance()->pack_size_);
   data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
@@ -70,22 +72,41 @@ int32_t RecvGroup::DoDispatchMsg() {
   last_pack_time_ = Utils::getCurrentMsTime();
   std::lock_guard<std::mutex> lck(mutex_);
   if (group_key_.empty()) {
-    LOG_ERROR("groupId  is empty, check!!");
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("groupId  is empty, check!!");
+      log_stat_ = 0;
+    }
     return SdkCode::kInvalidInput;
   }
   if (msgs_.empty()) {
-    LOG_ERROR("no msg in msg_set, check!");
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("no msg in msg_set, check!");
+      log_stat_ = 0;
+    }
     return SdkCode::kMsgEmpty;
   }
   auto send_group = send_manager_->GetSendGroup(group_key_);
   if (send_group == nullptr) {
-    LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
+      log_stat_ = 0;
+    }
     return SdkCode::kFailGetSendBuf;
   }
   if (!send_group->IsAvailable()) {
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send group! group_key:"
+                << group_key_ << " send group is not available!");
+      log_stat_ = 0;
+    }
     return SdkCode::kFailGetConn;
   }
   if (send_group->IsFull()) {
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send group! group_key:"
+                << group_key_ << " send group is full!");
+      log_stat_ = 0;
+    }
     return SdkCode::kSendBufferFull;
   }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index 2d1a9e6315..58d3dc70a3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -48,6 +48,7 @@ private:
 
   uint64_t max_recv_size_;
   std::string group_key_;
+  uint64_t log_stat_;
 
   int32_t DoDispatchMsg();
   void AddMsg(const std::string &msg, std::string client_ip,
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 1c5f440119..8c2dfc4c1e 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -254,11 +254,10 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
 
 int32_t ProxyManager::GetProxy(const std::string &key,
                                ProxyInfoVec &proxy_info_vec) {
-  if (SdkConfig::getInstance()->enable_isolation_) {
+  if (constants::IsolationLevel::kLevelOne == 
SdkConfig::getInstance()->isolation_level_) {
     return GetProxyByGroupid(key, proxy_info_vec);
-  } else {
-    return GetProxyByClusterId(key, proxy_info_vec);
   }
+  return GetProxyByClusterId(key, proxy_info_vec);
 }
 
 int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
@@ -289,13 +288,14 @@ int32_t ProxyManager::CheckBidConf(const std::string 
&inlong_group_id,
   return SdkCode::kSuccess;
 }
 
-bool ProxyManager::HasProxy(const std::string &inlong_group_id) {
-  if (SdkConfig::getInstance()->enable_isolation_) {
-    return CheckGroupid(inlong_group_id);
-  } else {
-    return CheckClusterId(inlong_group_id);
+bool ProxyManager::HasProxy(const std::string &group_key) {
+  if (constants::IsolationLevel::kLevelOne ==
+      SdkConfig::getInstance()->isolation_level_) {
+    return CheckGroupid(group_key);
   }
+  return CheckClusterId(group_key);
 }
+
 int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id,
                                         ProxyInfoVec &proxy_info_vec) {
   unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
@@ -319,15 +319,11 @@ int32_t ProxyManager::GetProxyByClusterId(const 
std::string &cluster_id,
   return SdkCode::kSuccess;
 }
 std::string ProxyManager::GetGroupKey(const std::string &groupid) {
-  if (SdkConfig::getInstance()->enable_isolation_) {
-    return groupid;
-  }
-  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
-  auto it = groupid_2_cluster_id_map_.find(groupid);
-  if (it == groupid_2_cluster_id_map_.end()) {
-    return "";
+  if (constants::IsolationLevel::kLevelThird ==
+      SdkConfig::getInstance()->isolation_level_) {
+    return GetClusterID(groupid);
   }
-  return it->second;
+  return groupid;
 }
 bool ProxyManager::CheckGroupid(const std::string &groupid) {
   unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
@@ -346,9 +342,6 @@ bool ProxyManager::CheckClusterId(const std::string 
&cluster_id) {
   return true;
 }
 void ProxyManager::UpdateClusterId2ProxyMap() {
-  if (SdkConfig::getInstance()->enable_isolation_) {
-    return;
-  }
   unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
   for (const auto &it : groupid_2_cluster_id_update_map_) {
     ProxyInfoVec proxy_info_vec;
@@ -440,5 +433,12 @@ std::string ProxyManager::RecoverFromLocalCache(const 
std::string &groupid) {
   LOG_INFO("RecoverFromLocalCache:" << groupid << ",local cache:" << 
meta_data);
   return meta_data;
 }
-
+std::string ProxyManager::GetClusterID(const std::string &groupid) {
+  unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+  auto it = groupid_2_cluster_id_map_.find(groupid);
+  if (it == groupid_2_cluster_id_map_.end()) {
+    return "";
+  }
+  return it->second;
+}
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 6e1c58ef39..8ae859869f 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -68,7 +68,7 @@ public:
   int32_t GetProxyByClusterId(const std::string &cluster_id,
                               ProxyInfoVec &proxy_info_vec);
   std::string GetGroupKey(const std::string &groupid);
-  bool HasProxy(const std::string &inlong_group_id);
+  bool HasProxy(const std::string &group_key);
   bool CheckGroupid(const std::string &groupid);
   bool CheckClusterId(const std::string &cluster_id);
   void UpdateClusterId2ProxyMap();
@@ -77,6 +77,7 @@ public:
   void ReadLocalCache();
   void WriteLocalCache();
   std::string RecoverFromLocalCache(const std::string&groupid);
+  std::string GetClusterID(const std::string &groupid);
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 6104411b9a..0af18f7ccb 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -22,22 +22,15 @@
 #include "proxy_manager.h"
 namespace inlong {
 SendManager::SendManager() : send_group_idx_(0) {
-  for (int32_t i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size();
-       i++) {
-    LOG_INFO("SendManager, group_id:"
-             << SdkConfig::getInstance()->inlong_group_ids_[i]
-             << " send group num:"
-             << SdkConfig::getInstance()->per_groupid_thread_nums_);
-    std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey(
-        SdkConfig::getInstance()->inlong_group_ids_[i]);
-    AddSendGroup(send_group_key);
-  }
+  LOG_INFO("SendManager,send group num:"
+           << SdkConfig::getInstance()->per_groupid_thread_nums_);
 }
 
 SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) {
-  SendGroupPtr send_group_ptr = DoGetSendGroup(group_key);
+  std::string send_key= GetSendKey(group_key);
+  SendGroupPtr send_group_ptr = DoGetSendGroup(send_key);
   if (send_group_ptr == nullptr) {
-    AddSendGroup(group_key);
+    AddSendGroup(send_key);
   }
   return send_group_ptr;
 }
@@ -84,5 +77,10 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string 
&send_group_key) {
   }
   return send_group_vec[send_group_idx_];
 }
-
+std::string SendManager::GetSendKey(const std::string &send_group_key) {
+  if (constants::IsolationLevel::kLevelSecond == 
SdkConfig::getInstance()->isolation_level_) {
+    return ProxyManager::GetInstance()->GetClusterID(send_group_key);
+  }
+  return send_group_key;
+}
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
index fa627647f2..d6ab24bcbc 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
@@ -33,6 +33,7 @@ private:
   std::unordered_map<std::string, std::vector<SendGroupPtr>> send_group_map_;
   SendGroupPtr DoGetSendGroup(const std::string &send_group_key);
   void DoAddSendGroup(const std::string &send_group_key);
+  std::string GetSendKey(const std::string &send_group_key);
   volatile uint32_t send_group_idx_;
 
 public:
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index fe25ee7861..eaa2821263 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -25,6 +25,9 @@
 
 namespace inlong {
 namespace constants {
+enum IsolationLevel{
+  kLevelOne =1, kLevelSecond =2, kLevelThird =3
+};
 static const int32_t kMaxRequestTDMTimes = 4;
 static const char kAttrFormat[] =
     "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";

Reply via email to