This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b5d92d1b77 [INLONG-9228][SDK] CPP SDK supports dynamic load balancing 
(#9235)
b5d92d1b77 is described below

commit b5d92d1b77c25dd11a0f2f257289ea584cc3bb4f
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Nov 8 11:28:59 2023 +0800

    [INLONG-9228][SDK] CPP SDK supports dynamic load balancing (#9235)
---
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h       |   6 +-
 .../dataproxy-sdk-cpp/src/client/stat.h            |  11 +-
 .../dataproxy-sdk-cpp/src/client/tcp_client.cc     | 194 ++++++++++--
 .../dataproxy-sdk-cpp/src/client/tcp_client.h      |  34 +-
 .../dataproxy-sdk-cpp/src/config/proxy_info.h      |   8 +-
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |  30 ++
 .../dataproxy-sdk-cpp/src/group/send_group.cc      | 348 +++++++++++++++++----
 .../dataproxy-sdk-cpp/src/group/send_group.h       |  42 ++-
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc |  13 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |  10 +
 10 files changed, 591 insertions(+), 105 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 4407869ba7..609d6c35b8 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -54,6 +54,8 @@ public:
   uint32_t per_groupid_thread_nums_; // Sending thread per groupid
   uint32_t dispatch_interval_zip_;   // Compression thread distribution 
interval
   uint32_t dispatch_interval_send_;  // sending thread sending interval
+  uint32_t load_balance_interval_;
+  uint32_t heart_beat_interval_;
 
   // Packaging parameters
   bool enable_pack_;
@@ -77,7 +79,8 @@ public:
   std::string manager_cluster_url_;
   uint32_t manager_update_interval_; // Automatic update interval, minutes
   uint32_t manager_url_timeout_;     // URL parsing timeout, seconds
-  uint32_t max_proxy_num_;
+  uint64_t max_proxy_num_;
+  uint64_t reserve_proxy_num_;
   uint32_t msg_type_;
   bool enable_isolation_;
 
@@ -85,6 +88,7 @@ public:
   bool enable_tcp_nagle_;
   uint64_t tcp_idle_time_;          // The time when tcpclient did not send 
data
   uint32_t tcp_detection_interval_; // tcp-client detection interval
+  bool enable_balance_;
 
   // auth settings
   bool need_auth_;
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
index 695a66e31f..f57a26565f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
@@ -28,29 +28,34 @@ private:
 
   uint64_t send_failed_pack_num_;
   uint64_t send_failed_msg_num_;
+  uint64_t time_cost_;
 
 public:
   Stat()
       : send_success_pack_num_(0), send_success_msg_num_(0),
-        send_failed_pack_num_(0), send_failed_msg_num_(0) {}
+        send_failed_pack_num_(0), send_failed_msg_num_(0) ,time_cost_(0) {}
 
   void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; }
   void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; }
   void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; }
   void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; }
+  void AddTimeCost(uint64_t time_cost) { time_cost_ += time_cost; }
 
   void ResetStat() {
     send_success_pack_num_ = 0;
     send_success_msg_num_ = 0;
     send_failed_pack_num_ = 0;
     send_failed_msg_num_ = 0;
+    time_cost_ = 0;
   }
   std::string ToString() {
     std::stringstream stat;
     stat << "success-pack[" << send_success_pack_num_ << "]";
-    stat << " success-msg[" << send_success_msg_num_ << "]";
+    stat << "msg[" << send_success_msg_num_ << "]";
     stat << " failed-pack[" << send_failed_pack_num_ << "]";
-    stat << " failed-msg[" << send_failed_msg_num_ << "]";
+    stat << "msg[" << send_failed_msg_num_ << "]";
+    uint64_t pack_num = send_success_pack_num_ + send_failed_msg_num_ + 1;
+    stat << " trans[" << time_cost_ / pack_num << "]";
     return stat.str();
   }
 };
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index 8a2a583b3b..e27910c4e9 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -29,8 +29,9 @@ TcpClient::TcpClient(IOContext &io_context, std::string ip, 
uint32_t port)
       wait_timer_(std::make_shared<asio::steady_timer>(io_context)),
       keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)),
       ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), 
port),
-      status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false) {
-  ;
+      status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false),
+      proxy_loads_(30), wait_heart_beat_(false), reset_client_(false),
+      heart_beat_index_(0), only_heart_heat_(false) {
   client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
 
   tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_;
@@ -82,7 +83,6 @@ void TcpClient::AsyncConnect() {
       asio::error_code error;
       socket_->close(error);
       if (asio::error::operation_aborted == error) {
-        // operation aborted
         return;
       }
     }
@@ -102,7 +102,6 @@ void TcpClient::DoAsyncConnect(asio::error_code error) {
   }
   if (error) {
     if (asio::error::operation_aborted == error) {
-      // operation aborted必
       return;
     }
   }
@@ -122,7 +121,6 @@ void TcpClient::OnConnected(asio::error_code error) {
     return;
   }
   if (asio::error::operation_aborted == error) {
-    // operation aborted
     return;
   }
   status_ = kConnectFailed;
@@ -164,23 +162,22 @@ void TcpClient::OnWroten(const asio::error_code error,
   }
   if (error) {
     if (asio::error::operation_aborted == error) {
-      // operation aborted
       return;
     }
-    status_ = kWriting;
     LOG_ERROR("write error:" << error.message() << CLIENT_INFO);
+    status_ = kWriting;
     HandleFail();
     return;
   }
 
   if (0 == bytes_transferred) {
-    status_ = kWaiting;
     LOG_ERROR("transferred 0 bytes." << CLIENT_INFO);
+    status_ = kWaiting;
     HandleFail();
     return;
   }
 
-  status_ = CLIENT_RESPONSE;
+  status_ = kClientResponse;
   asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)),
                    std::bind(&TcpClient::OnReturn, this, std::placeholders::_1,
                              std::placeholders::_2));
@@ -191,17 +188,16 @@ void TcpClient::OnReturn(asio::error_code error, 
std::size_t len) {
   }
   if (error) {
     if (asio::error::operation_aborted == error) {
-      // operation aborted
       return;
     }
     LOG_ERROR("OnReturn error:" << error.message() << CLIENT_INFO);
     status_ = kWaiting;
+    std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
     HandleFail();
     return;
   }
   if (len != sizeof(uint32_t)) {
     status_ = kWaiting;
-    LOG_ERROR("len :" << len <<" != sizeof(uint32_t):" <<sizeof(uint32_t)<< 
CLIENT_INFO);
     HandleFail();
     return;
   }
@@ -210,11 +206,11 @@ void TcpClient::OnReturn(asio::error_code error, 
std::size_t len) {
 
   if (resp_len > recv_buf_->m_max_size) {
     status_ = kWaiting;
-    LOG_ERROR("invalid resp_len :" << resp_len << CLIENT_INFO);
     HandleFail();
     return;
   }
-  asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, resp_len),
+  asio::async_read(*socket_,
+                   asio::buffer(recv_buf_->m_data + sizeof(uint32_t), 
resp_len),
                    std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
                              std::placeholders::_2));
 }
@@ -226,7 +222,6 @@ void TcpClient::OnBody(asio::error_code error, size_t 
bytesTransferred) {
 
   if (error) {
     if (asio::error::operation_aborted == error) {
-      // operation aborted
       return;
     }
     LOG_ERROR("OnBody error:" << error.message() << CLIENT_INFO);
@@ -234,12 +229,27 @@ void TcpClient::OnBody(asio::error_code error, size_t 
bytesTransferred) {
     HandleFail();
     return;
   }
+  uint32_t parse_index = sizeof(uint32_t);
+  uint8_t msg_type =
+      *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
 
-  if (sendBuffer_ != nullptr) {
-    stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
-    stat_.AddSendSuccessPackNum(1);
+  switch (msg_type) {
+  case 8:
+    ParseHeartBeat(bytesTransferred);
+    break;
+  default:
+    ParseGenericResponse();
+    break;
+  }
+  if (wait_heart_beat_) {
+    HeartBeat();
+    wait_heart_beat_ = false;
+    return;
+  }
 
-    sendBuffer_->releaseBuf();
+  if (reset_client_) {
+    RestClient();
+    reset_client_ = false;
   }
 
   status_ = kFree;
@@ -255,6 +265,8 @@ void TcpClient::HandleFail() {
     stat_.AddSendFailMsgNum(sendBuffer_->msgCnt());
     stat_.AddSendFailPackNum(1);
 
+    stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+
     sendBuffer_->doUserCallBack();
     sendBuffer_->releaseBuf();
   }
@@ -269,12 +281,14 @@ void TcpClient::DetectStatus(const asio::error_code 
error) {
   if (error) {
     return;
   }
-
-  LOG_INFO(stat_.ToString() << CLIENT_INFO);
-  stat_.ResetStat();
+  if (!only_heart_heat_) {
+    LOG_INFO(stat_.ToString() << CLIENT_INFO);
+    stat_.ResetStat();
+  }
 
   if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
       status_ != kConnecting) {
+    std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
     LOG_INFO("reconnect because it has idle "
              << tcp_idle_time_ << " ms."
              << "last send time:" << last_update_time_ << CLIENT_INFO);
@@ -287,4 +301,142 @@ void TcpClient::DetectStatus(const asio::error_code 
error) {
       std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
 }
 
+void TcpClient::HeartBeat(bool only_heart_heat) {
+  if (kStopped == status_ || exit_) {
+    return;
+  }
+  only_heart_heat_ = only_heart_heat;
+  status_ = kHeartBeat;
+  last_update_time_ = Utils::getCurrentMsTime();
+  // status_ = kWriting;
+
+  bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4);
+  bin_hb_.msg_type = 8;
+  bin_hb_.data_time =
+      htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+  bin_hb_.body_ver = 1;
+  bin_hb_.body_len = 0;
+  bin_hb_.attr_len = 0;
+  bin_hb_.magic = htons(constants::kBinaryMagic);
+  char *hb = (char *)&bin_hb_;
+  uint32_t hb_len = sizeof(bin_hb_);
+
+  asio::async_write(*socket_, asio::buffer(hb, hb_len),
+                    std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1,
+                              std::placeholders::_2));
+}
+
+void TcpClient::ParseHeartBeat(size_t total_length) {
+  //  | total length(4) | msg type(1) | data time(4) | body version(1) | body
+  //  length (4) | body | attr length(2) | attr | magic (2) | skip total length
+  uint32_t parse_index = sizeof(uint32_t);
+  //  skip msg type
+  parse_index += sizeof(uint8_t);
+  //  skip data time
+  //  uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
+  //  *>(recv_buf_->m_data + parse_index));
+  parse_index += sizeof(uint32_t);
+
+  // 3、parse body version
+  uint32_t body_version =
+      *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+  parse_index += sizeof(uint8_t);
+
+  // 4、parse body length
+  uint32_t body_length = ntohl(
+      *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index));
+  parse_index += sizeof(uint32_t);
+
+  // 5 parse load
+  int16_t load = ntohs(
+      *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index));
+  parse_index += sizeof(int16_t);
+
+  // 7 parse attr length
+  uint16_t attr_length = ntohs(
+      *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+  parse_index += sizeof(uint16_t);
+
+  // 8 skip attr
+  parse_index += attr_length;
+
+  // 9 parse magic
+  uint16_t magic = ntohs(
+      *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+  parse_index += sizeof(uint16_t);
+
+  if (magic != constants::kBinaryMagic) {
+    LOG_ERROR("failed to parse heartbeat ack! error magic "
+              << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO);
+    return;
+  }
+
+  if (total_length + 4 != parse_index) {
+    LOG_ERROR("failed to parse heartbeat ack! total_length "
+              << total_length << " +4 !=" << parse_index << CLIENT_INFO);
+    return;
+  }
+  if (heart_beat_index_ > constants::MAX_STAT) {
+    heart_beat_index_ = 0;
+  }
+  if (body_version == 1 && body_length == 2) {
+    proxy_loads_[heart_beat_index_++ % 30] = load;
+  } else {
+    proxy_loads_[heart_beat_index_++ % 30] = 0;
+  }
+  LOG_INFO("current load is " << load << CLIENT_INFO);
+}
+
+void TcpClient::ParseGenericResponse() {
+  if (sendBuffer_ != nullptr) {
+    stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+    stat_.AddSendSuccessPackNum(1);
+
+    stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+
+    sendBuffer_->releaseBuf();
+  }
+}
+
+int32_t TcpClient::GetAvgLoad() {
+  int32_t numerator = 0;
+  int32_t denominator = 0;
+  for (int i = 0; i < proxy_loads_.size(); i++) {
+    if (proxy_loads_[i] > 0) {
+      numerator += proxy_loads_[i] * constants::kWeight[i];
+      denominator += constants::kWeight[i];
+    }
+  }
+  int32_t avg_load = 0;
+  if (0 == denominator) {
+    return avg_load;
+  }
+  avg_load = numerator / denominator;
+  LOG_INFO("average load is " << avg_load << CLIENT_INFO);
+  return avg_load;
+}
+
+void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; }
+
+void TcpClient::UpdateClient(const std::string ip, const uint32_t port) {
+  LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port
+                           << "] replace" << CLIENT_INFO);
+  ip_ = ip;
+  port_ = port;
+  reset_client_ = true;
+}
+void TcpClient::RestClient() {
+  std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
+  asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip_), port_);
+  endpoint_ = endpoint;
+  client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
+
+  LOG_INFO("RestClient[" << only_heart_heat_ << "]" << CLIENT_INFO);
+
+  AsyncConnect();
+}
+const std::string &TcpClient::getIp() const { return ip_; }
+const std::string &TcpClient::getClientInfo() const { return client_info_; }
+uint32_t TcpClient::getPort() const { return port_; }
+
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
index be0b6d915a..6835a7fd26 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
@@ -24,6 +24,7 @@
 #include "../utils/capi_constant.h"
 #include "../utils/read_write_mutex.h"
 #include "../utils/send_buffer.h"
+#include "msg_protocol.h"
 #include "stat.h"
 #include <queue>
 
@@ -36,8 +37,10 @@ enum ClientStatus {
   kConnectFailed = 4,
   kWaiting = 5,
   kStopped = 6,
-  CLIENT_RESPONSE = 7
+  kClientResponse = 7,
+  kHeartBeat = 8
 };
+
 enum {
   kConnectTimeout = 1000 * 20,
 };
@@ -50,9 +53,23 @@ private:
   SteadyTimerPtr keep_alive_timer_;
   ClientStatus status_;
   std::string ip_;
+
+public:
+  const std::string &getIp() const;
+
+private:
   uint32_t port_;
+
+public:
+  uint32_t getPort() const;
+
+private:
   std::string client_info_;
 
+public:
+  const std::string &getClientInfo() const;
+
+private:
   std::shared_ptr<SendBuffer> sendBuffer_;
   asio::ip::tcp::endpoint endpoint_;
   BlockMemoryPtrT recv_buf_;
@@ -61,6 +78,12 @@ private:
   uint64_t last_update_time_;
   Stat stat_;
   bool exit_;
+  BinaryHB bin_hb_ = {0};
+  std::vector<int32_t> proxy_loads_;
+  uint32_t heart_beat_index_;
+  bool wait_heart_beat_;
+  bool reset_client_;
+  volatile bool only_heart_heat_;
 
 public:
   TcpClient(IOContext &io_context, std::string ip, uint32_t port);
@@ -74,9 +97,16 @@ public:
   void OnBody(asio::error_code error, size_t bytesTransferred);
   void DoClose();
   void HandleFail();
-  bool isFree() { return (status_ == kFree); };
+  bool isFree() { return (status_ == kFree); }
   void write(SendBufferPtrT sendBuffer);
   void DetectStatus(const asio::error_code error);
+  void HeartBeat(bool only_heart_heat = false);
+  void SetHeartBeatStatus();
+  void ParseHeartBeat(size_t total_length);
+  void ParseGenericResponse();
+  void UpdateClient(const std::string ip, const uint32_t port);
+  void RestClient();
+  int32_t GetAvgLoad();
 };
 typedef std::shared_ptr<TcpClient> TcpClientTPtrT;
 typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
index d433e3a295..59700a473f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
@@ -29,13 +29,17 @@ private:
   std::string proxy_str_id_;
   std::string ip_;
   int32_t port_;
+  int32_t load_;
 
 public:
-  ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port)
-      : proxy_str_id_(proxy_str_id), ip_(ip), port_(port) {}
+  ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port,int32_t 
load)
+      : proxy_str_id_(proxy_str_id), ip_(ip), port_(port), load_(load) {}
   ProxyInfo(){};
+  void setIp(const std::string& ip) { ip_ = ip; }
+  void setPort(int32_t port) { port_ = port; }
   std::string ip() const { return ip_; }
   int32_t port() const { return port_; }
+  int32_t GetLoad() const { return load_; }
 };
 using ProxyInfoVec = std::vector<ProxyInfo>;
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 3620d8985e..91e2fa8ad4 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -78,6 +78,9 @@ void SdkConfig::defaultInit() {
   dispatch_interval_zip_ = constants::kDispatchIntervalZip;
   tcp_detection_interval_ = constants::kTcpDetectionInterval;
   tcp_idle_time_ = constants::kTcpIdleTime;
+  load_balance_interval_ = constants::kLoadBalanceInterval;
+  heart_beat_interval_ = constants::kHeartBeatInterval;
+  enable_balance_ = constants::kEnableBalance;
 
   // cache parameter
   send_buf_size_ = constants::kSendBufSize;
@@ -107,6 +110,7 @@ void SdkConfig::defaultInit() {
   manager_url_timeout_ = constants::kManagerTimeout;
   max_proxy_num_ = constants::kMaxProxyNum;
   enable_isolation_ = constants::kEnableIsolation;
+  reserve_proxy_num_ = constants::kReserveProxyNum;
 
   local_ip_ = constants::kSerIP;
   local_port_ = constants::kSerPort;
@@ -144,6 +148,24 @@ void SdkConfig::InitThreadParam(const rapidjson::Value 
&doc) {
   } else {
     dispatch_interval_send_ = constants::kDispatchIntervalSend;
   }
+
+  if (doc.HasMember("load_balance_interval") &&
+      doc["load_balance_interval"].IsInt() &&
+      doc["load_balance_interval"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["load_balance_interval"];
+    load_balance_interval_ = obj.GetInt();
+  } else {
+    load_balance_interval_ = constants::kLoadBalanceInterval;
+  }
+
+  if (doc.HasMember("heart_beat_interval") &&
+      doc["heart_beat_interval"].IsInt() &&
+      doc["heart_beat_interval"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["heart_beat_interval"];
+    heart_beat_interval_ = obj.GetInt();
+  } else {
+    heart_beat_interval_ = constants::kHeartBeatInterval;
+  }
 }
 
 void SdkConfig::InitCacheParam(const rapidjson::Value &doc) {
@@ -360,6 +382,14 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
   } else {
     tcp_idle_time_ = constants::kTcpIdleTime;
   }
+
+  // enable balance
+  if (doc.HasMember("enable_balance") && doc["enable_balance"].IsBool()) {
+    const rapidjson::Value &obj = doc["enable_balance"];
+    enable_balance_ = obj.GetBool();
+  } else {
+    enable_balance_ = constants::kEnableBalance;
+  }
 }
 void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
   // auth settings
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index fb91972318..4591953fa0 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -25,19 +25,26 @@
 
 namespace inlong {
 const int kDefaultQueueSize = 20;
-SendGroup::SendGroup(std::string group_id)
-    : work_(asio::make_work_guard(io_context_)), group_id_(group_id),
-      send_idx_(0) {
+SendGroup::SendGroup(std::string send_group_key)
+    : work_(asio::make_work_guard(io_context_)),
+      send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0),
+      load_threshold_(0), max_proxy_num_(0) {
   max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
                         SdkConfig::getInstance()->pack_size_;
   if (max_send_queue_num_ <= 0) {
     max_send_queue_num_ = kDefaultQueueSize;
   }
-  LOG_INFO("SendGroup  max_send_queue_num " << max_send_queue_num_);
+  LOG_INFO("SendGroup: " << send_group_key_
+                         << ", max send queue num: " << max_send_queue_num_);
   dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
-  tcp_clients_old_ = nullptr;
-  tcp_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
-  tcp_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
+  load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
+  heart_beat_interval_ =
+      SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_;
+  need_balance_ = SdkConfig::getInstance()->enable_balance_;
+
+  work_clients_old_ = nullptr;
+  work_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
+  work_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
 
   send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
   send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
@@ -49,13 +56,29 @@ SendGroup::SendGroup(std::string group_id)
   update_conf_timer_->async_wait(
       std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
 
-  current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+  if (SdkConfig::getInstance()->enable_balance_) {
+    load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+    load_balance_timer_->expires_after(
+        std::chrono::milliseconds(load_balance_interval_));
+    load_balance_timer_->async_wait(
+        std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+  }
+
+  current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
   thread_ = std::thread(&SendGroup::Run, this);
 }
 SendGroup::~SendGroup() {
   LOG_INFO("~SendGroup ");
-  send_timer_->cancel();
-  update_conf_timer_->cancel();
+  if (send_timer_) {
+    send_timer_->cancel();
+  }
+  if (update_conf_timer_) {
+    update_conf_timer_->cancel();
+  }
+  if (load_balance_timer_) {
+    update_conf_timer_->cancel();
+  }
+
   io_context_.stop();
   if (thread_.joinable()) {
     thread_.join();
@@ -76,19 +99,18 @@ void SendGroup::DispatchData(std::error_code error) {
     return;
   }
   try {
-    unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
-    if (tcp_clients_ != nullptr) {
-      if (send_idx_ >= tcp_clients_->size()) {
+    unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+    if (work_clients_ != nullptr) {
+      if (send_idx_ >= work_clients_->size()) {
         send_idx_ = 0;
       }
-
-      while (send_idx_ < tcp_clients_->size()) {
-        if ((*tcp_clients_)[send_idx_]->isFree()) {
+      while (send_idx_ < work_clients_->size()) {
+        if ((*work_clients_)[send_idx_]->isFree()) {
           SendBufferPtrT send_buf = PopData();
           if (send_buf == nullptr) {
             break;
           }
-          (*tcp_clients_)[send_idx_]->write(send_buf);
+          (*work_clients_)[send_idx_]->write(send_buf);
         }
         send_idx_++;
       }
@@ -96,10 +118,33 @@ void SendGroup::DispatchData(std::error_code error) {
   } catch (std::exception &e) {
     LOG_ERROR("Exception " << e.what());
   }
+
+  if (need_balance_ && dispatch_stat_++ > heart_beat_interval_) {
+    HeartBeat();
+    dispatch_stat_ = 0;
+  }
+
   send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
   send_timer_->async_wait(
       std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
 }
+void SendGroup::HeartBeat() {
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  if (work_clients_ == nullptr) {
+    return;
+  }
+  for (int idx = 0; idx < work_clients_->size(); idx++) {
+    if ((*work_clients_)[idx]->isFree()) {
+      (*work_clients_)[idx]->HeartBeat();
+    } else {
+      (*work_clients_)[idx]->SetHeartBeatStatus();
+    }
+  }
+
+  for (int idx = 0; idx < reserve_clients_.size(); idx++) {
+    reserve_clients_[idx]->HeartBeat(true);
+  }
+}
 
 bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
 
@@ -120,30 +165,36 @@ void SendGroup::UpdateConf(std::error_code error) {
 
   ClearOldTcpClients();
 
-  ProxyInfoVec new_proxy_info;
-  if (ProxyManager::GetInstance()->GetProxy(group_id_, new_proxy_info) !=
+  ProxyInfoVec new_bus_info;
+  if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) !=
           kSuccess ||
-      new_proxy_info.empty()) {
+      new_bus_info.empty()) {
     update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
     update_conf_timer_->async_wait(
         std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
     return;
   }
+  if (new_bus_info.empty()) {
+    LOG_INFO("UpdateConf new_bus_info is empty!");
+    return;
+  }
+
+  load_threshold_ = new_bus_info[0].GetLoad() > 
constants::kDefaultLoadThreshold
+                        ? constants::kDefaultLoadThreshold
+                        : std::max((new_bus_info[0].GetLoad()), 0);
 
-  if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
-    LOG_INFO("Don`t need UpdateConf. current proxy size("
-             << current_proxy_vec_.size() << ")=proxy size("
-             << new_proxy_info.size() << ")");
+  if (!IsConfChanged(current_bus_vec_, new_bus_info)) {
+    LOG_INFO("Don`t need UpdateConf. current bus size("
+             << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size()
+             << ")");
     update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
     update_conf_timer_->async_wait(
         std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
     return;
   }
 
-  uint32_t proxy_num = SdkConfig::getInstance()->max_proxy_num_;
-  if (proxy_num > new_proxy_info.size()) {
-    proxy_num = new_proxy_info.size();
-  }
+  max_proxy_num_ =
+      std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size());
 
   std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
       std::make_shared<std::vector<TcpClientTPtrT>>();
@@ -155,32 +206,33 @@ void SendGroup::UpdateConf(std::error_code error) {
     return;
   }
 
-  std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end());
+  std::random_shuffle(new_bus_info.begin(), new_bus_info.end());
 
-  tcp_clients_tmp->reserve(proxy_num);
-  for (int i = 0; i < proxy_num; i++) {
-    ProxyInfo proxy_tmp = new_proxy_info[i];
-    TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(
-        io_context_, proxy_tmp.ip(), proxy_tmp.port());
+  tcp_clients_tmp->reserve(max_proxy_num_);
+  for (int i = 0; i < max_proxy_num_; i++) {
+    ProxyInfo bus_tmp = new_bus_info[i];
+    TcpClientTPtrT tcpClientTPtrT =
+        std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
     tcp_clients_tmp->push_back(tcpClientTPtrT);
-    LOG_INFO("new proxy info.[" << proxy_tmp.ip() << ":" << proxy_tmp.port()
-                                << "]");
+    LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]");
   }
 
   {
     LOG_INFO("do change tcp clients.");
-    unique_write_lock<read_write_mutex> wtlck(remote_proxy_list_mutex_);
-    tcp_clients_old_ = tcp_clients_;
-    tcp_clients_ = tcp_clients_tmp;
+    unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_);
+    work_clients_old_ = work_clients_;
+    work_clients_ = tcp_clients_tmp;
   }
 
-  if (tcp_clients_old_ != nullptr) {
-    for (int j = 0; j < tcp_clients_old_->size(); j++) {
-      (*tcp_clients_old_)[j]->DoClose();
+  if (work_clients_old_ != nullptr) {
+    for (int j = 0; j < work_clients_old_->size(); j++) {
+      (*work_clients_old_)[j]->DoClose();
     }
   }
 
-  current_proxy_vec_ = new_proxy_info;
+  current_bus_vec_ = new_bus_info;
+
+  InitReserveClient();
 
   update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
   update_conf_timer_->async_wait(
@@ -204,25 +256,25 @@ uint32_t SendGroup::GetQueueSize() {
   return send_buf_list_.size();
 }
 
-bool SendGroup::IsConfChanged(ProxyInfoVec &current_proxy_vec,
-                              ProxyInfoVec &new_proxy_vec) {
-  if (new_proxy_vec.empty())
+bool SendGroup::IsConfChanged(ProxyInfoVec &current_bus_vec,
+                              ProxyInfoVec &new_bus_vec) {
+  if (new_bus_vec.empty())
     return false;
-  if (current_proxy_vec.size() != new_proxy_vec.size()) {
+  if (current_bus_vec.size() != new_bus_vec.size()) {
     return true;
   }
 
-  for (auto &current_bu : current_proxy_vec) {
-    for (int i = 0; i < new_proxy_vec.size(); i++) {
-      if ((current_bu.ip() == new_proxy_vec[i].ip()) &&
-          (current_bu.port() == new_proxy_vec[i].port()))
+  for (auto &current_bu : current_bus_vec) {
+    for (int i = 0; i < new_bus_vec.size(); i++) {
+      if ((current_bu.ip() == new_bus_vec[i].ip()) &&
+          (current_bu.port() == new_bus_vec[i].port()))
         break;
-      if (i == (new_proxy_vec.size() - 1)) {
-        if ((current_bu.ip() != new_proxy_vec[i].ip() ||
-             current_bu.port() == new_proxy_vec[i].port())) {
-          LOG_INFO("current proxy ip." << current_bu.ip() << ":"
-                                       << current_bu.port()
-                                       << " can`t find in proxy.");
+      if (i == (new_bus_vec.size() - 1)) {
+        if ((current_bu.ip() != new_bus_vec[i].ip() ||
+             current_bu.port() == new_bus_vec[i].port())) {
+          LOG_INFO("current bus ip." << current_bu.ip() << ":"
+                                     << current_bu.port()
+                                     << " can`t find in bus.");
           return true;
         }
       }
@@ -232,20 +284,188 @@ bool SendGroup::IsConfChanged(ProxyInfoVec 
&current_proxy_vec,
 }
 
 bool SendGroup::IsAvailable() {
-  unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
-  if (tcp_clients_ == nullptr) {
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  if (work_clients_ == nullptr) {
     return false;
   }
-  if (tcp_clients_->empty()) {
+  if (work_clients_->empty()) {
     return false;
   }
   return true;
 }
 void SendGroup::ClearOldTcpClients() {
-  if (tcp_clients_old_ != nullptr) {
-    LOG_INFO("ClearOldTcpClients." << tcp_clients_old_->size());
-    tcp_clients_old_->clear();
-    tcp_clients_old_.reset();
+  if (work_clients_old_ != nullptr) {
+    LOG_INFO("ClearOldTcpClients." << work_clients_old_->size());
+    work_clients_old_->clear();
+    work_clients_old_.reset();
   }
 }
+
+void SendGroup::LoadBalance(std::error_code error) {
+  if (error) {
+    return;
+  }
+
+  if (NeedDoLoadBalance()) {
+    DoLoadBalance();
+  }
+  uint64_t interval = load_balance_interval_ + rand() % 120 * 1000;
+  LOG_INFO("LoadBalance interval:" << interval);
+  load_balance_timer_->expires_after(std::chrono::milliseconds(interval));
+  load_balance_timer_->async_wait(
+      std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+}
+
+void SendGroup::DoLoadBalance() {
+  if (reserve_clients_.empty()) {
+    return;
+  }
+
+  TcpClientTPtrT work_client = GetMaxLoadClient();
+  TcpClientTPtrT reserve_client = GetReserveClient();
+  if (reserve_client == nullptr || work_client == nullptr) {
+    LOG_ERROR("client nullptr");
+    return;
+  }
+
+  if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) >
+      load_threshold_) {
+    LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace"
+                              << work_client->getClientInfo() << ",load[work "
+                              << work_client->GetAvgLoad() << "][reserve "
+                              << reserve_client->GetAvgLoad() << "][threshold "
+                              << load_threshold_ << "]");
+    std::string ip = work_client->getIp();
+    uint32_t port = work_client->getPort();
+    work_client->UpdateClient(reserve_client->getIp(),
+                              reserve_client->getPort());
+
+    ProxyInfo proxy = GetRandomProxy(ip, port);
+    if (!proxy.ip().empty()) {
+      reserve_client->UpdateClient(proxy.ip(), proxy.port());
+    }
+  }
+
+  need_balance_ = true;
+}
+bool SendGroup::NeedDoLoadBalance() {
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  if (load_threshold_ <= 0 ||
+      work_clients_->size() == current_bus_vec_.size()) {
+    LOG_INFO("Don`t need DoLoadBalance [load_threshold]:"
+             << load_threshold_
+             << ",[tcp_client size]:" << work_clients_->size()
+             << ",[current_bus_vec size]:" << current_bus_vec_.size());
+    need_balance_ = false;
+    return false;
+  }
+  need_balance_ = true;
+  return true;
+}
+void SendGroup::InitReserveClient() {
+  if (max_proxy_num_ >= current_bus_vec_.size()) {
+    return;
+  }
+  uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_;
+  uint64_t reserve_num =
+      std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
+  if (reserve_num <= 0) {
+    return;
+  }
+
+  unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_);
+  reserve_clients_.clear();
+
+  for (uint64_t i = current_bus_vec_.size() - reserve_num;
+       i < current_bus_vec_.size(); i++) {
+    ProxyInfo bus_tmp = current_bus_vec_[i];
+    TcpClientTPtrT tcpClientTPtrT =
+        std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
+    reserve_clients_.push_back(tcpClientTPtrT);
+  }
+  LOG_INFO(
+      "InitReserveClient reserve_clients size:" << reserve_clients_.size());
+}
+bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end) 
{
+  if (begin && end) {
+    return begin->GetAvgLoad() < end->GetAvgLoad();
+  }
+  return false;
+}
+
+TcpClientTPtrT SendGroup::GetMaxLoadClient() {
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  uint32_t client_index = 0;
+  int32_t max_load = (*work_clients_)[0]->GetAvgLoad();
+  for (int index = 1; index < work_clients_->size(); index++) {
+    int32_t proxy_load = (*work_clients_)[index]->GetAvgLoad();
+    if (proxy_load > max_load) {
+      max_load = proxy_load;
+      client_index = index;
+    }
+  }
+  return (*work_clients_)[client_index];
+}
+
+ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) {
+  ProxyInfo proxy_info;
+  for (auto &it : current_bus_vec_) {
+    if (it.ip() == ip && it.port() == port) {
+      continue;
+    }
+    bool exist = false;
+    for (int index = 0; index < reserve_clients_.size(); index++) {
+      if (it.ip() == reserve_clients_[index]->getIp() &&
+          it.port() == reserve_clients_[index]->getPort()) {
+        exist = true;
+        break;
+      }
+    }
+    if (exist) {
+      continue;
+    }
+    if (ExistInWorkClient(it.ip(), it.port())) {
+      continue;
+    }
+    proxy_info.setIp(it.ip());
+    proxy_info.setPort(it.port());
+    return proxy_info;
+  }
+  return proxy_info;
+}
+
+TcpClientTPtrT SendGroup::GetReserveClient() {
+  std::sort(reserve_clients_.begin(), reserve_clients_.end(), UpSort);
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  ProxyInfo proxy_info;
+  for (auto &it : reserve_clients_) {
+    if (it->GetAvgLoad() <= 0) {
+      continue;
+    }
+    bool exist = false;
+    for (int index = 0; index < work_clients_->size(); index++) {
+      if (it->getIp() == (*work_clients_)[index]->getIp() &&
+          it->getPort() == (*work_clients_)[index]->getPort()) {
+        exist = true;
+        break;
+      }
+    }
+    if (exist) {
+      continue;
+    }
+    return it;
+  }
+  return nullptr;
+}
+
+bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) {
+  unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+  for (int index = 0; index < work_clients_->size(); index++) {
+    if (ip == (*work_clients_)[index]->getIp() &&
+        port == (*work_clients_)[index]->getPort()) {
+      return true;
+    }
+  }
+  return false;
+}
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
index 88dc90771c..a52d63dcd3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -39,27 +39,37 @@ private:
   io_context_work work_;
   std::thread thread_;
   void Run();
+  uint64_t max_proxy_num_;
 
 public:
-  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_;
-  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_old_;
+  std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_;
+  std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_;
+  std::vector<TcpClientTPtrT> reserve_clients_;
 
-  ProxyInfoVec current_proxy_vec_;
-  TcpClientTPtrVecItT current_client_;
+  ProxyInfoVec current_bus_vec_;
   std::queue<SendBufferPtrT> send_buf_list_;
 
   SteadyTimerPtr send_timer_;
   SteadyTimerPtr update_conf_timer_;
+  SteadyTimerPtr load_balance_timer_;
 
   read_write_mutex send_group_mutex_;
-  read_write_mutex remote_proxy_list_mutex_;
+  read_write_mutex work_clients_mutex_;
+  read_write_mutex reserve_clients_mutex_;
   std::mutex mutex_;
-  std::string group_id_;
+  std::string send_group_key_;
 
   std::uint32_t send_idx_;
   uint32_t max_send_queue_num_;
 
-  SendGroup(std::string group_id);
+  uint32_t dispatch_interval_;
+  uint32_t heart_beat_interval_;
+  uint32_t load_balance_interval_;
+  uint32_t dispatch_stat_;
+  volatile bool need_balance_;
+  volatile int32_t load_threshold_;
+
+  SendGroup(std::string send_group_key);
   ~SendGroup();
 
   void PreDispatchData(std::error_code error);
@@ -69,14 +79,24 @@ public:
   SendBufferPtrT PopData();
   uint32_t GetQueueSize();
   void UpdateConf(std::error_code error);
-  bool IsConfChanged(ProxyInfoVec &current_proxy_vec,
-                     ProxyInfoVec &new_proxy_vec);
+  bool IsConfChanged(ProxyInfoVec &current_bus_vec, ProxyInfoVec &new_bus_vec);
   bool IsAvailable();
-  uint32_t dispatch_interval_;
-
   void ClearOldTcpClients();
+
+  // balance
+  bool NeedDoLoadBalance();
+  void LoadBalance(std::error_code error);
+  void DoLoadBalance();
+  void HeartBeat();
+  ProxyInfo GetRandomProxy(const std::string &ip = "", uint32_t port = 0);
+  TcpClientTPtrT GetReserveClient();
+  TcpClientTPtrT GetMaxLoadClient();
+  void InitReserveClient();
+  static bool UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end);
+  bool ExistInWorkClient(const std::string &ip, uint32_t port);
 };
 using SendGroupPtr = std::shared_ptr<SendGroup>;
+
 } // namespace inlong
 
 #endif // INLONG_SDK_SEND_GROUP_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index abe33dde21..f2e887d29c 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -181,6 +181,17 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
   }
   groupid_2_cluster_id_update_map_[inlong_group_id] =
       clusterInfo["clusterId"].GetInt();
+
+  // check load
+  int32_t load = 0;
+  if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
+      !clusterInfo["load"].IsNull()) {
+    const rapidjson::Value &obj = clusterInfo["load"];
+    load = obj.GetInt();
+  } else {
+    load = 0;
+  }
+
   // proxy list
   for (auto &proxy : nodeList.GetArray()) {
     std::string ip;
@@ -212,7 +223,7 @@ int32_t ProxyManager::ParseAndGet(const std::string 
&inlong_group_id,
       LOG_WARN("there is no id info of inlong_group_id");
       continue;
     }
-    proxy_info_vec.emplace_back(id, ip, port);
+    proxy_info_vec.emplace_back(id, ip, port, load);
   }
 
   return SdkCode::kSuccess;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index b2af191112..eb17f56313 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -43,6 +43,9 @@ static const uint32_t kMaxStreamIdNum = 100;
 
 static const int32_t kDispatchIntervalZip = 8;
 static const int32_t kDispatchIntervalSend = 10;
+static const int32_t kLoadBalanceInterval = 300000;
+static const int32_t kHeartBeatInterval = 60000;
+static const bool kEnableBalance = true;
 
 static const bool kEnablePack = true;
 static const uint32_t kPackSize = 409600;
@@ -66,6 +69,7 @@ static const char kManagerClusterURL[] =
 static const uint32_t kManagerUpdateInterval = 2;
 static const uint32_t kManagerTimeout = 5;
 static const uint32_t kMaxProxyNum = 8;
+static const uint32_t kReserveProxyNum = 2;
 
 static const bool kEnableTCPNagle = true;
 static const uint32_t kTcpIdleTime = 600000;
@@ -91,6 +95,12 @@ static const uint32_t kMaxAttrLen = 2048;
 const uint32_t ATTR_LENGTH = 10;
 static const bool kEnableIsolation = false;
 
+static const int32_t kDefaultLoadThreshold = 200;
+const uint32_t MAX_STAT = 10000000;
+static const int32_t kWeight[30] = {1,  1,  1,  1,  1,  2,  2,  2,   2,   2,
+                                    3,  3,  3,  3,  3,  6,  6,  6,   6,   6,
+                                    12, 12, 12, 12, 12, 48, 96, 192, 384, 
1000};
+
 } // namespace constants
 } // namespace inlong
 #endif // INLONG_SDK_CONSTANT_H
\ No newline at end of file


Reply via email to