This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ac4e5af08 [INLONG-10838][SDK] Optimize the ability to send data for 
DataProxy C++ SDK (#10850)
9ac4e5af08 is described below

commit 9ac4e5af08812b6b0364a5296f9afc63433cee82
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Aug 21 18:46:39 2024 +0800

    [INLONG-10838][SDK] Optimize the ability to send data for DataProxy C++ SDK 
(#10850)
---
 .../dataproxy-sdk-cpp/include/inlong_api.h         |  54 +++++
 .../dataproxy-sdk-cpp/src/client/tcp_client.cc     |  10 +-
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |   6 +
 .../dataproxy-sdk-cpp/src/config/sdk_conf.h        |   2 +-
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          |  34 +--
 .../dataproxy-sdk-cpp/src/core/api_imp.h           |  10 +-
 .../dataproxy-sdk-cpp/src/core/inlong_api.cc       |  29 ++-
 .../dataproxy-sdk-cpp/src/core/inlong_api.h        |  44 ++--
 .../dataproxy-sdk-cpp/src/core/sdk_msg.h           |  37 +++-
 .../dataproxy-sdk-cpp/src/group/recv_group.cc      |  10 +-
 .../dataproxy-sdk-cpp/src/group/send_group.cc      | 236 +++++++++------------
 .../dataproxy-sdk-cpp/src/group/send_group.h       |  57 +++--
 .../dataproxy-sdk-cpp/src/protocol/msg_protocol.cc |   4 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   1 +
 .../dataproxy-sdk-cpp/src/utils/send_buffer.h      | 153 +++++++------
 15 files changed, 362 insertions(+), 325 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h
new file mode 100644
index 0000000000..8f79c3b802
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_SDK_API_H
+#define INLONG_SDK_API_H
+
+#include <clocale>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace inlong {
+
+typedef  int (*UserCallBack)(const char *, const char *, const char *, int32_t,
+                             const int64_t, const char *);
+
+class ApiImp;
+
+class InLongApi {
+ public:
+  InLongApi();
+  ~InLongApi();
+  int32_t InitApi(const char *config_path);
+
+  int32_t AddInLongGroupId(const std::vector<std::string> &group_ids);
+
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
+               UserCallBack call_back = nullptr);
+
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
+               int64_t data_time, UserCallBack call_back = nullptr);
+
+  int32_t CloseApi(int32_t max_waitms);
+
+ private:
+  std::shared_ptr<ApiImp> api_impl_;
+};
+}  // namespace inlong
+#endif  // INLONG_SDK_API_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index eb3264ba6c..c3b7692809 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -164,7 +164,7 @@ void TcpClient::BeginWrite() {
   }
   last_update_time_ = Utils::getCurrentMsTime();
   status_ = kWriting;
-  asio::async_write(*socket_, asio::buffer(sendBuffer_->content(), 
sendBuffer_->len()),
+  asio::async_write(*socket_, asio::buffer(sendBuffer_->GetData(), 
sendBuffer_->GetDataLen()),
                     std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1, std::placeholders::_2));
 }
 void TcpClient::OnWroten(const asio::error_code error, std::size_t 
bytes_transferred) {
@@ -390,8 +390,8 @@ void TcpClient::ParseHeartBeat(size_t total_length) {
 
 void TcpClient::ParseGenericResponse() {
   if (sendBuffer_ != nullptr) {
-    std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + 
sendBuffer_->getStreamId();
-    stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+    std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + 
sendBuffer_->GetInlongStreamId();
+    stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->GetMsgCnt());
     stat_map_[stat_key].AddSendSuccessPackNum(1);
     stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - 
last_update_time_);
 
@@ -448,8 +448,8 @@ void TcpClient::ResetSendBuffer() {
   }
   retry_times_++;
   if (retry_times_ > SdkConfig::getInstance()->retry_times_) {
-    std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + 
sendBuffer_->getStreamId();
-    stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt());
+    std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + 
sendBuffer_->GetInlongStreamId();
+    stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->GetMsgCnt());
     stat_map_[stat_key].AddSendFailPackNum(1);
     stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - 
last_update_time_);
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 12a82b07e7..5f067528af 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -443,6 +443,12 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
   } else {
     retry_times_ = constants::kRetryTimes;
   }
+  if (doc.HasMember("proxy_repeat_times") && doc["proxy_repeat_times"].IsInt() 
&& doc["proxy_repeat_times"].GetInt() >= 0) {
+    const rapidjson::Value &obj = doc["proxy_repeat_times"];
+    proxy_repeat_times_ = obj.GetInt();
+  } else {
+    proxy_repeat_times_ = constants::kProxyRepeatTimes;
+  }
 }
 void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
   // auth settings
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index ae6cfde58b..f120343f07 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -96,7 +96,7 @@ private:
   bool enable_balance_;
   bool enable_local_cache_;
   uint32_t retry_times_;
-
+  uint32_t proxy_repeat_times_;
 
   // auth settings
   bool need_auth_;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 7ae2c79ea2..2109af502e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -46,31 +46,31 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
   return DoInit();
 }
 
-int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char 
*msg, int32_t msg_len,
+int32_t ApiImp::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
                      UserCallBack call_back) {
-  int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+  int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
   if(code !=SdkCode::kSuccess){
     return code;
   }
 
-  return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back);
+  return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, 
call_back);
 }
-int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char 
*msg, int32_t msg_len,
+int32_t ApiImp::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
                      int64_t data_time, UserCallBack call_back) {
-  int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+  int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
   if(code !=SdkCode::kSuccess){
     return code;
   }
 
-  return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back, 
data_time);
+  return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, 
call_back, data_time);
 }
 
-int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, 
const char *msg, int32_t msg_len){
+int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len){
   if (msg_len > max_msg_length_) {
-    MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+    MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, 
inlong_stream_id, 1);
     return SdkCode::kMsgTooLong;
   }
-  if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len 
<= 0) {
+  if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == 
nullptr || msg_len <= 0) {
     return SdkCode::kInvalidInput;
   }
 
@@ -80,9 +80,9 @@ int32_t ApiImp::ValidateParams(const char *group_id, const 
char *stream_id, cons
   return SdkCode::kSuccess;
 }
 
-int32_t ApiImp::SendBase(const std::string& inlong_group_id, const 
std::string& stream_id, const std::string& msg,
+int32_t ApiImp::SendBase(const std::string& inlong_group_id, const 
std::string& inlong_stream_id, const std::string& msg,
                          UserCallBack call_back, int64_t report_time) {
-  int32_t check_ret = CheckData(inlong_group_id, stream_id, msg);
+  int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg);
   if (check_ret != SdkCode::kSuccess) {
     return check_ret;
   }
@@ -91,11 +91,11 @@ int32_t ApiImp::SendBase(const std::string& 
inlong_group_id, const std::string&
 
   auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id);
   if (recv_group == nullptr) {
-    LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << 
",getStreamId:" << stream_id);
+    LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << 
",getStreamId:" << inlong_stream_id);
     return SdkCode::kFailGetRevGroup;
   }
 
-  return recv_group->SendData(msg, inlong_group_id, stream_id, report_time, 
call_back);
+  return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, 
report_time, call_back);
 }
 
 int32_t ApiImp::CloseApi(int32_t max_waitms) {
@@ -125,19 +125,19 @@ int32_t ApiImp::DoInit() {
   return InitManager();
 }
 
-int32_t ApiImp::CheckData(const std::string& group_id, const std::string& 
stream_id, const std::string& msg) {
+int32_t ApiImp::CheckData(const std::string& inlong_group_id, const 
std::string& inlong_stream_id, const std::string& msg) {
   if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
     LOG_ERROR("capi has been closed, Init first and then send");
     return SdkCode::kSendAfterClose;
   }
 
-  if (msg.empty() || group_id.empty() || stream_id.empty()) {
-    LOG_ERROR("invalid input, group id:" << group_id << " stream id:" << 
stream_id << "msg" << msg);
+  if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) {
+    LOG_ERROR("invalid input, group id:" << inlong_group_id << " stream id:" 
<< inlong_stream_id << "msg" << msg);
     return SdkCode::kInvalidInput;
   }
 
   if (msg.size() > SdkConfig::getInstance()->max_msg_size_) {
-    MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+    MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, 
inlong_stream_id, 1);
     LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " 
ext_pack_size"
                                                      << 
SdkConfig::getInstance()->max_msg_size_);
     return SdkCode::kMsgTooLong;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index 32ca093416..cd6a9757a4 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -34,10 +34,10 @@ class ApiImp {
   ~ApiImp();
   int32_t InitApi(const char *config_file_path);
 
-  int32_t Send(const char *group_id, const char *stream_id, const char *msg, 
int32_t msg_len,
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
                UserCallBack call_back = nullptr);
 
-  int32_t Send(const char *group_id, const char *stream_id, const char *msg, 
int32_t msg_len,
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
                int64_t report_time, UserCallBack call_back = nullptr);
   int32_t CloseApi(int32_t max_waitms);
 
@@ -46,12 +46,12 @@ class ApiImp {
  private:
   int32_t DoInit();
   int32_t InitManager();
-  int32_t SendBase(const std::string& inlong_group_id, const std::string& 
stream_id, const std::string& msg, UserCallBack call_back,
+  int32_t SendBase(const std::string& inlong_group_id, const std::string& 
inlong_stream_id, const std::string& msg, UserCallBack call_back,
                    int64_t report_time = 0);
 
-  int32_t CheckData(const std::string& group_id, const std::string& stream_id, 
const std::string& msg);
+  int32_t CheckData(const std::string& inlong_group_id, const std::string& 
inlong_stream_id, const std::string& msg);
 
-  int32_t ValidateParams(const char *group_id, const char *stream_id, const 
char *msg, int32_t msg_len);
+  int32_t ValidateParams(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len);
 
   AtomicInt user_exit_flag_{0};
   volatile bool init_flag_ = false;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
index 14a5f91207..aa0674414d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc
@@ -15,28 +15,25 @@
  * limitations under the License.
  */
 
-#include "inlong_api.h"
+#include "../../include/inlong_api.h"
 #include "../core/api_imp.h"
 namespace inlong {
 
-InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); };
+InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); }
 InLongApi::~InLongApi() { api_impl_->CloseApi(10); }
 
-int32_t InLongApi::InitApi(const char *config_path) {
-  return api_impl_->InitApi(config_path);
-}
+int32_t InLongApi::InitApi(const char *config_path) { return 
api_impl_->InitApi(config_path); }
 
-int32_t InLongApi::Send(const char *inlong_group_id,
-                        const char *inlong_stream_id, const char *msg,
-                        int32_t msg_len, UserCallBack call_back) {
-  return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len,
-                         call_back);
+int32_t InLongApi::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
+                        UserCallBack call_back) {
+  return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, 
call_back);
 }
 
-int32_t InLongApi::CloseApi(int32_t max_waitms) {
-  return api_impl_->CloseApi(max_waitms);
-}
-int32_t InLongApi::AddBid(const std::vector<std::string> &groupids) {
-  return api_impl_->AddInLongGroupId(groupids);
+int32_t InLongApi::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
+                        int64_t data_time, UserCallBack call_back) {
+  return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, 
data_time, call_back);
 }
-} // namespace inlong
\ No newline at end of file
+
+int32_t InLongApi::CloseApi(int32_t max_waitms) { return 
api_impl_->CloseApi(max_waitms); }
+int32_t InLongApi::AddInLongGroupId(const std::vector<std::string> &bids) { 
return api_impl_->AddInLongGroupId(bids); }
+}  // namespace inlong
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
index 66a1a7ce39..88f0a528ff 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h
@@ -1,20 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #ifndef INLONG_SDK_API_H
@@ -34,21 +32,23 @@ typedef int (*UserCallBack)(const char *, const char *, 
const char *, int32_t,
 class ApiImp;
 
 class InLongApi {
-public:
+ public:
   InLongApi();
   ~InLongApi();
   int32_t InitApi(const char *config_path);
 
-  int32_t AddBid(const std::vector<std::string> &groupids);
+  int32_t AddGroupId(const std::vector<std::string> &group_ids);
 
-  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
-               const char *msg, int32_t msg_len,
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
                UserCallBack call_back = nullptr);
 
+  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len,
+               int64_t data_time, UserCallBack call_back = nullptr);
+
   int32_t CloseApi(int32_t max_waitms);
 
-private:
+ private:
   std::shared_ptr<ApiImp> api_impl_;
 };
-} // namespace inlong
-#endif // INLONG_SDK_API_H
+}  // namespace inlong
+#endif  // INLONG_SDK_API_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
index d9e8366486..69bcdddde8 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-#ifndef SDK_USER_MSG_H_
-#define SDK_USER_MSG_H_
+#ifndef SDK_USER_MSG_H
+#define SDK_USER_MSG_H
 
 #include <functional>
 #include <memory>
@@ -40,27 +40,44 @@ struct SdkMsg {
   std::string inlong_group_id_;
   std::string inlong_stream_id_;
 
-  SdkMsg(const std::string &mmsg, const std::string &mclient_ip,
-         int64_t mreport_time, UserCallBack mcb, const std::string &attr,
-         const std::string &u_ip, int64_t u_time,const std::string& 
inlong_group_id,const std::string& inlong_stream_id)
+  SdkMsg(const std::string &mmsg,
+         const std::string &mclient_ip,
+         int64_t mreport_time,
+         UserCallBack mcb,
+         const std::string &attr,
+         const std::string &u_ip,
+         int64_t u_time,
+         const std::string &inlong_group_id,
+         const std::string &inlong_stream_id)
       : msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time),
         cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip),
         data_pack_format_attr_(attr),
         inlong_group_id_(inlong_group_id),
-        inlong_stream_id_(inlong_stream_id){}
-  SdkMsg(){};
+        inlong_stream_id_(inlong_stream_id) {}
+  SdkMsg() {};
   void setMsg(const std::string &msg) { msg_ = msg; }
   void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; }
   void setReportTime(uint64_t reportTime) { report_time_ = reportTime; }
-  void setCb(UserCallBack cb) { cb_ = cb;}
+  void setCb(UserCallBack cb) { cb_ = cb; }
   void setUserReportTime(uint64_t userReportTime) { user_report_time_ = 
userReportTime; }
   void setUserClientIp(const std::string &userClientIp) { user_client_ip_ = 
userClientIp; }
   void setDataPackFormatAttr(const std::string &dataPackFormatAttr) { 
data_pack_format_attr_ = dataPackFormatAttr; }
   void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ = 
inlong_group_id; }
   void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = 
inlong_stream_id; }
+
+  void clear() {
+    msg_ = "";
+    client_ip_ = "";
+    report_time_ = 0;
+    cb_ = nullptr;
+    user_report_time_ = 0;
+    user_client_ip_ = "";
+    data_pack_format_attr_ = "";
+    inlong_group_id_ = "";
+    inlong_stream_id_ = "";
+  }
 };
 using SdkMsgPtr = std::shared_ptr<SdkMsg>;
 
 } // namespace inlong
-
-#endif // SDK_USER_MSG_H_
\ No newline at end of file
+#endif // SDK_USER_MSG_H
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 377432a2ef..1ffeeef8dc 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -352,15 +352,15 @@ std::shared_ptr<SendBuffer> 
RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs
     uniq_id_ = 0;
   }
 
-  if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) {
+  if (!PackMsg(msgs, send_buffer->GetData(), len) || len == 0) {
     LOG_ERROR("failed to write data to send buf from pack queue, sendQueue");
     return nullptr;
   }
 
-  send_buffer->setLen(len);
-  send_buffer->setMsgCnt(msg_cnt);
-  send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
-  send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
+  send_buffer->SetDataLen(len);
+  send_buffer->SetMsgCnt(msg_cnt);
+  send_buffer->SetInlongGroupId(msgs[0]->inlong_group_id_);
+  send_buffer->SetInlongStreamId(msgs[0]->inlong_stream_id_);
   for (const auto &it : msgs) {
     if(it->cb_){
       send_buffer->addUserMsg(it);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index f317163799..0bbedcd0fb 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -1,45 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #include "send_group.h"
-#include "api_code.h"
-#include "proxy_manager.h"
+#include <sys/prctl.h>
 #include <algorithm>
 #include <random>
+#include "../core/api_code.h"
+#include "../manager/proxy_manager.h"
 
 namespace inlong {
-const int kDefaultQueueSize = 20;
+const uint32_t kDefaultQueueSize = 200;
 SendGroup::SendGroup(std::string send_group_key)
     : work_(asio::make_work_guard(io_context_)),
-      send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0),
-      load_threshold_(0), max_proxy_num_(0) {
-  max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
-                        SdkConfig::getInstance()->pack_size_;
-  if (max_send_queue_num_ <= 0) {
+      send_group_key_(send_group_key),
+      send_idx_(0),
+      dispatch_stat_(0),
+      load_threshold_(0),
+      max_proxy_num_(0) {
+  max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / 
SdkConfig::getInstance()->pack_size_;
+  if (max_send_queue_num_ <= kDefaultQueueSize) {
     max_send_queue_num_ = kDefaultQueueSize;
   }
-  LOG_INFO("SendGroup:" << send_group_key_
-                         << ",max send queue num:" << max_send_queue_num_);
+  max_send_queue_num_ = std::max(max_send_queue_num_, kDefaultQueueSize);
+  LOG_INFO("SendGroup:" << send_group_key_ << ",max send queue num:" << 
max_send_queue_num_);
   dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
   load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
-  heart_beat_interval_ =
-      SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_;
+  heart_beat_interval_ = SdkConfig::getInstance()->heart_beat_interval_ / 
dispatch_interval_;
   need_balance_ = SdkConfig::getInstance()->enable_balance_;
 
   work_clients_old_ = nullptr;
@@ -48,23 +48,19 @@ SendGroup::SendGroup(std::string send_group_key)
 
   send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
   send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
-  send_timer_->async_wait(
-      std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1));
+  send_timer_->async_wait(std::bind(&SendGroup::PreDispatchData, this, 
std::placeholders::_1));
 
   update_conf_timer_ = std::make_shared<asio::steady_timer>(io_context_);
   update_conf_timer_->expires_after(std::chrono::milliseconds(1));
-  update_conf_timer_->async_wait(
-      std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+  update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, 
std::placeholders::_1));
 
   if (SdkConfig::getInstance()->enable_balance_) {
     load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_);
-    load_balance_timer_->expires_after(
-        std::chrono::milliseconds(load_balance_interval_));
-    load_balance_timer_->async_wait(
-        std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+    
load_balance_timer_->expires_after(std::chrono::milliseconds(load_balance_interval_));
+    load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, 
std::placeholders::_1));
   }
 
-  current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+  current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
   thread_ = std::thread(&SendGroup::Run, this);
 }
 SendGroup::~SendGroup() {
@@ -84,14 +80,16 @@ SendGroup::~SendGroup() {
     thread_.join();
   }
 }
-void SendGroup::Run() { io_context_.run(); }
+void SendGroup::Run() {
+  prctl(PR_SET_NAME, "send-group");
+  io_context_.run();
+}
 void SendGroup::PreDispatchData(std::error_code error) {
   if (error) {
     return;
   }
   send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
-  send_timer_->async_wait(
-      std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+  send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, 
std::placeholders::_1));
 }
 
 void SendGroup::DispatchData(std::error_code error) {
@@ -125,8 +123,7 @@ void SendGroup::DispatchData(std::error_code error) {
   }
 
   send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
-  send_timer_->async_wait(
-      std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+  send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, 
std::placeholders::_1));
 }
 void SendGroup::HeartBeat() {
   unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
@@ -148,12 +145,12 @@ void SendGroup::HeartBeat() {
 
 bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
 
-uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) {
+uint32_t SendGroup::PushData(const SendBufferPtrT &send_buffer_ptr) {
   if (IsFull()) {
     return SdkCode::kSendBufferFull;
   }
   std::lock_guard<std::mutex> lock(mutex_);
-  send_buf_list_.push(send_buffer_ptr);
+  send_proxy_list_.push(send_buffer_ptr);
   return SdkCode::kSuccess;
 }
 
@@ -165,60 +162,53 @@ void SendGroup::UpdateConf(std::error_code error) {
 
   ClearOldTcpClients();
 
-  ProxyInfoVec new_bus_info;
-  if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) !=
-          kSuccess ||
-      new_bus_info.empty()) {
+  ProxyInfoVec new_proxy_info;
+  if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_proxy_info) 
!= kSuccess || new_proxy_info.empty()) {
     update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
-    update_conf_timer_->async_wait(
-        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, 
std::placeholders::_1));
     return;
   }
-  if (new_bus_info.empty()) {
-    LOG_INFO("UpdateConf new_bus_info is empty!");
+  if (new_proxy_info.empty()) {
+    LOG_INFO("New proxy is empty when update config!");
     return;
   }
 
-  load_threshold_ = new_bus_info[0].GetLoad() > 
constants::kDefaultLoadThreshold
-                        ? constants::kDefaultLoadThreshold
-                        : std::max((new_bus_info[0].GetLoad()), 0);
+  load_threshold_ = new_proxy_info[0].GetLoad() > 
constants::kDefaultLoadThreshold
+                    ? constants::kDefaultLoadThreshold
+                    : std::max((new_proxy_info[0].GetLoad()), 0);
 
-  if (!IsConfChanged(current_bus_vec_, new_bus_info)) {
-    LOG_INFO("Don`t need UpdateConf. current bus size("
-             << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size()
-             << ")");
+  if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
+    LOG_INFO("Don`t need UpdateConf. current proxy size(" << 
current_proxy_vec_.size() << ")=proxy size("
+                                                        << 
new_proxy_info.size() << ")");
     update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
-    update_conf_timer_->async_wait(
-        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, 
std::placeholders::_1));
     return;
   }
 
-  max_proxy_num_ =
-      std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size());
+  max_proxy_num_ = std::min(SdkConfig::getInstance()->max_proxy_num_, 
new_proxy_info.size());
 
-  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
-      std::make_shared<std::vector<TcpClientTPtrT>>();
+  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp = 
std::make_shared<std::vector<TcpClientTPtrT>>();
   if (tcp_clients_tmp == nullptr) {
-    LOG_INFO("tcp_clients_tmp is  nullptr");
+    LOG_INFO("Tcp clients tmp is nullptr");
     update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
-    update_conf_timer_->async_wait(
-        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, 
std::placeholders::_1));
     return;
   }
 
-  std::random_shuffle(new_bus_info.begin(), new_bus_info.end());
+  std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end());
 
   tcp_clients_tmp->reserve(max_proxy_num_);
   for (int i = 0; i < max_proxy_num_; i++) {
-    ProxyInfo bus_tmp = new_bus_info[i];
-    TcpClientTPtrT tcpClientTPtrT =
-        std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
-    tcp_clients_tmp->push_back(tcpClientTPtrT);
-    LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]");
+    ProxyInfo tmp_proxy = new_proxy_info[i];
+    for (int repeat_time = 0; repeat_time < 
SdkConfig::getInstance()->proxy_repeat_times_; repeat_time++) {
+      TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_, 
tmp_proxy.ip(), tmp_proxy.port());
+      tcp_clients_tmp->push_back(tcpClientTPtrT);
+      LOG_INFO("New proxy info.[" << tmp_proxy.ip() << ":" << tmp_proxy.port() 
<< "]");
+    }
   }
 
   {
-    LOG_INFO("do change tcp clients.");
+    LOG_INFO("Do change tcp clients.");
     unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_);
     work_clients_old_ = work_clients_;
     work_clients_ = tcp_clients_tmp;
@@ -230,51 +220,43 @@ void SendGroup::UpdateConf(std::error_code error) {
     }
   }
 
-  current_bus_vec_ = new_bus_info;
+  current_proxy_vec_ = new_proxy_info;
 
   InitReserveClient();
 
   update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
-  update_conf_timer_->async_wait(
-      std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+  update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, 
std::placeholders::_1));
 
-  LOG_INFO("Finished UpdateConf.");
+  LOG_INFO("Finished update send group config.");
 }
 
 SendBufferPtrT SendGroup::PopData() {
   std::lock_guard<std::mutex> lock(mutex_);
-  if (send_buf_list_.empty()) {
+  if (send_proxy_list_.empty()) {
     return nullptr;
   }
-  SendBufferPtrT send_buf = send_buf_list_.front();
-  send_buf_list_.pop();
+  SendBufferPtrT send_buf = send_proxy_list_.front();
+  send_proxy_list_.pop();
   return send_buf;
 }
 
 uint32_t SendGroup::GetQueueSize() {
   std::lock_guard<std::mutex> lock(mutex_);
-  return send_buf_list_.size();
+  return send_proxy_list_.size();
 }
 
-bool SendGroup::IsConfChanged(ProxyInfoVec &current_bus_vec,
-                              ProxyInfoVec &new_bus_vec) {
-  if (new_bus_vec.empty())
-    return false;
-  if (current_bus_vec.size() != new_bus_vec.size()) {
+bool SendGroup::IsConfChanged(ProxyInfoVec &current_proxy_vec, ProxyInfoVec 
&new_proxy_vec) {
+  if (new_proxy_vec.empty()) return false;
+  if (current_proxy_vec.size() != new_proxy_vec.size()) {
     return true;
   }
 
-  for (auto &current_bu : current_bus_vec) {
-    for (int i = 0; i < new_bus_vec.size(); i++) {
-      if ((current_bu.ip() == new_bus_vec[i].ip()) &&
-          (current_bu.port() == new_bus_vec[i].port()))
-        break;
-      if (i == (new_bus_vec.size() - 1)) {
-        if ((current_bu.ip() != new_bus_vec[i].ip() ||
-             current_bu.port() == new_bus_vec[i].port())) {
-          LOG_INFO("current bus ip." << current_bu.ip() << ":"
-                                     << current_bu.port()
-                                     << " can`t find in bus.");
+  for (auto &current_bu : current_proxy_vec) {
+    for (int i = 0; i < new_proxy_vec.size(); i++) {
+      if ((current_bu.ip() == new_proxy_vec[i].ip()) && (current_bu.port() == 
new_proxy_vec[i].port())) break;
+      if (i == (new_proxy_vec.size() - 1)) {
+        if ((current_bu.ip() != new_proxy_vec[i].ip() || current_bu.port() == 
new_proxy_vec[i].port())) {
+          LOG_INFO("current proxy ip." << current_bu.ip() << ":" << 
current_bu.port() << " can`t find in proxy.");
           return true;
         }
       }
@@ -312,8 +294,7 @@ void SendGroup::LoadBalance(std::error_code error) {
   uint64_t interval = load_balance_interval_ + rand() % 120 * 1000;
   LOG_INFO("LoadBalance interval:" << interval);
   load_balance_timer_->expires_after(std::chrono::milliseconds(interval));
-  load_balance_timer_->async_wait(
-      std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+  load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, 
std::placeholders::_1));
 }
 
 void SendGroup::DoLoadBalance() {
@@ -328,17 +309,13 @@ void SendGroup::DoLoadBalance() {
     return;
   }
 
-  if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) >
-      load_threshold_) {
-    LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace"
-                              << work_client->getClientInfo() << ",load[work "
-                              << work_client->GetAvgLoad() << "][reserve "
-                              << reserve_client->GetAvgLoad() << "][threshold "
-                              << load_threshold_ << "]");
+  if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > 
load_threshold_) {
+    LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" 
<< work_client->getClientInfo()
+                              << ",load[work " << work_client->GetAvgLoad() << 
"][reserve "
+                              << reserve_client->GetAvgLoad() << "][threshold 
" << load_threshold_ << "]");
     std::string ip = work_client->getIp();
     uint32_t port = work_client->getPort();
-    work_client->UpdateClient(reserve_client->getIp(),
-                              reserve_client->getPort());
+    work_client->UpdateClient(reserve_client->getIp(), 
reserve_client->getPort());
 
     ProxyInfo proxy = GetRandomProxy(ip, port);
     if (!proxy.ip().empty()) {
@@ -350,12 +327,10 @@ void SendGroup::DoLoadBalance() {
 }
 bool SendGroup::NeedDoLoadBalance() {
   unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
-  if (load_threshold_ <= 0 ||
-      work_clients_->size() == current_bus_vec_.size()) {
-    LOG_INFO("Don`t need DoLoadBalance [load_threshold]:"
-             << load_threshold_
-             << ",[tcp_client size]:" << work_clients_->size()
-             << ",[current_bus_vec size]:" << current_bus_vec_.size());
+  if (load_threshold_ <= 0 || work_clients_->size() == 
current_proxy_vec_.size()) {
+    LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" << load_threshold_
+                                                          << ",[tcp_client 
size]:" << work_clients_->size()
+                                                          << 
",[current_proxy_vec size]:" << current_proxy_vec_.size());
     need_balance_ = false;
     return false;
   }
@@ -363,12 +338,11 @@ bool SendGroup::NeedDoLoadBalance() {
   return true;
 }
 void SendGroup::InitReserveClient() {
-  if (max_proxy_num_ >= current_bus_vec_.size()) {
+  if (max_proxy_num_ >= current_proxy_vec_.size()) {
     return;
   }
-  uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_;
-  uint64_t reserve_num =
-      std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
+  uint64_t max_reserve_num = current_proxy_vec_.size() - max_proxy_num_;
+  uint64_t reserve_num = 
std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
   if (reserve_num <= 0) {
     return;
   }
@@ -376,15 +350,12 @@ void SendGroup::InitReserveClient() {
   unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_);
   reserve_clients_.clear();
 
-  for (uint64_t i = current_bus_vec_.size() - reserve_num;
-       i < current_bus_vec_.size(); i++) {
-    ProxyInfo bus_tmp = current_bus_vec_[i];
-    TcpClientTPtrT tcpClientTPtrT =
-        std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
+  for (uint64_t i = current_proxy_vec_.size() - reserve_num; i < 
current_proxy_vec_.size(); i++) {
+    ProxyInfo tmp_proxy = current_proxy_vec_[i];
+    TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_, 
tmp_proxy.ip(), tmp_proxy.port());
     reserve_clients_.push_back(tcpClientTPtrT);
   }
-  LOG_INFO(
-      "InitReserveClient reserve_clients size:" << reserve_clients_.size());
+  LOG_INFO("InitReserveClient reserve_clients size:" << 
reserve_clients_.size());
 }
 bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end) 
{
   if (begin && end) {
@@ -409,14 +380,13 @@ TcpClientTPtrT SendGroup::GetMaxLoadClient() {
 
 ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) {
   ProxyInfo proxy_info;
-  for (auto &it : current_bus_vec_) {
+  for (auto &it : current_proxy_vec_) {
     if (it.ip() == ip && it.port() == port) {
       continue;
     }
     bool exist = false;
     for (int index = 0; index < reserve_clients_.size(); index++) {
-      if (it.ip() == reserve_clients_[index]->getIp() &&
-          it.port() == reserve_clients_[index]->getPort()) {
+      if (it.ip() == reserve_clients_[index]->getIp() && it.port() == 
reserve_clients_[index]->getPort()) {
         exist = true;
         break;
       }
@@ -444,8 +414,7 @@ TcpClientTPtrT SendGroup::GetReserveClient() {
     }
     bool exist = false;
     for (int index = 0; index < work_clients_->size(); index++) {
-      if (it->getIp() == (*work_clients_)[index]->getIp() &&
-          it->getPort() == (*work_clients_)[index]->getPort()) {
+      if (it->getIp() == (*work_clients_)[index]->getIp() && it->getPort() == 
(*work_clients_)[index]->getPort()) {
         exist = true;
         break;
       }
@@ -461,11 +430,10 @@ TcpClientTPtrT SendGroup::GetReserveClient() {
 bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) {
   unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
   for (int index = 0; index < work_clients_->size(); index++) {
-    if (ip == (*work_clients_)[index]->getIp() &&
-        port == (*work_clients_)[index]->getPort()) {
+    if (ip == (*work_clients_)[index]->getIp() && port == 
(*work_clients_)[index]->getPort()) {
       return true;
     }
   }
   return false;
 }
-} // namespace inlong
+}  // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
index a52d63dcd3..c54f858b90 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -1,53 +1,53 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #ifndef INLONG_SDK_SEND_GROUP_H
 #define INLONG_SDK_SEND_GROUP_H
 
-#include "../client/tcp_client.h"
+#include <queue>
+#include <unordered_map>
+
 #include "../config/proxy_info.h"
 #include "../utils/send_buffer.h"
-#include <queue>
+#include "../client/tcp_client.h"
+
 namespace inlong {
 const int kTimerMiSeconds = 10;
 const int kTimerMinute = 60000;
 
 using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
 using IOContext = asio::io_context;
-using io_context_work =
-    asio::executor_work_guard<asio::io_context::executor_type>;
+using io_context_work = 
asio::executor_work_guard<asio::io_context::executor_type>;
 
 class SendGroup : noncopyable {
-private:
+ private:
   IOContext io_context_;
-  io_context_work work_;
+  io_context_work work_;  // 保持io_context.run()在无任何任务时不退出
   std::thread thread_;
   void Run();
   uint64_t max_proxy_num_;
 
-public:
+ public:
   std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_;
   std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_;
   std::vector<TcpClientTPtrT> reserve_clients_;
 
-  ProxyInfoVec current_bus_vec_;
-  std::queue<SendBufferPtrT> send_buf_list_;
+  ProxyInfoVec current_proxy_vec_;
+  std::queue<SendBufferPtrT> send_proxy_list_;
 
   SteadyTimerPtr send_timer_;
   SteadyTimerPtr update_conf_timer_;
@@ -75,11 +75,11 @@ public:
   void PreDispatchData(std::error_code error);
   void DispatchData(std::error_code error);
   bool IsFull();
-  uint32_t PushData(SendBufferPtrT send_buffer_ptr);
+  uint32_t PushData(const SendBufferPtrT &send_buffer_ptr);
   SendBufferPtrT PopData();
   uint32_t GetQueueSize();
   void UpdateConf(std::error_code error);
-  bool IsConfChanged(ProxyInfoVec &current_bus_vec, ProxyInfoVec &new_bus_vec);
+  bool IsConfChanged(ProxyInfoVec &current_proxy_vec, ProxyInfoVec 
&new_proxy_vec);
   bool IsAvailable();
   void ClearOldTcpClients();
 
@@ -96,7 +96,6 @@ public:
   bool ExistInWorkClient(const std::string &ip, uint32_t port);
 };
 using SendGroupPtr = std::shared_ptr<SendGroup>;
+}  // namespace inlong
 
-} // namespace inlong
-
-#endif // INLONG_SDK_SEND_GROUP_H
+#endif  // INLONG_SDK_SEND_GROUP_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
index 28bfed473c..8754d60ab2 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc
@@ -29,9 +29,7 @@ char APIEncode::recvBuf[APIEncode::kRecvLen] = {0};
 
 void APIEncode::decodeProtocoMsg(SendBuffer *buf) {
   memset(recvBuf, 0x0, kRecvLen);
-  // //LOG_DEBUG("print buf content, %s", buf->content());
-  memcpy(recvBuf, buf->content(), buf->len());
-  // memcpy(recvBuf, buf->content(), buf->len());
+  memcpy(recvBuf, buf->GetData(), buf->GetDataLen());
 
   char *p = recvBuf;
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 47282e0e32..a1753b8255 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -83,6 +83,7 @@ static const uint32_t kTcpDetectionInterval = 60000;
 static const uint32_t kMaxRetryIntervalMs= 3000;
 static const uint32_t kRetryIntervalMs= 200;
 static const int32_t kRetryTimes = 1;
+static const uint32_t kProxyRepeatTimes = 1;
 
 static const char kSerIP[] = "127.0.0.1";
 static const uint32_t kSerPort = 46801;
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 5b7e5a596a..4b2ca778ba 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -1,123 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-#ifndef INLONG_SDK_SEND_BUFFER_H
-#define INLONG_SDK_SEND_BUFFER_H
+#ifndef INLONG_SEND_BUFFER_H
+#define INLONG_SEND_BUFFER_H
 
 #include <mutex>
 #include <string>
+#include <deque>
+#include <queue>
 
+#include "asio.hpp"
 #include "atomic.h"
 #include "logger.h"
 #include "noncopyable.h"
-#include "sdk_msg.h"
-#include <asio.hpp>
-#include <deque>
-#include <queue>
+
+#include "../core/sdk_msg.h"
+#include "../manager/msg_manager.h"
 
 namespace inlong {
-class Connection;
-using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
-using ConnectionPtr = std::shared_ptr<Connection>;
 class SendBuffer : noncopyable {
-private:
-  uint32_t uniq_id_;
-  std::atomic<bool> is_used_;
-  std::atomic<bool> is_packed_;
-  char *content_;
-  uint32_t size_;
-  int32_t msg_cnt_;
-  uint32_t len_;
+ private:
+  char *data_;
+  uint32_t data_len_;
+  uint32_t msg_cnt_;
   std::string inlong_group_id_;
   std::string inlong_stream_id_;
-  AtomicInt already_send_;
-  uint64_t first_send_time_;
-  uint64_t latest_send_time_;
   std::vector<SdkMsgPtr> user_msg_vector_;
 
-public:
-  SendBuffer(uint32_t size)
-      : uniq_id_(0), is_used_(false), is_packed_(false), size_(size),
-        msg_cnt_(0), len_(0), inlong_group_id_(), inlong_stream_id_(),
-        first_send_time_(0), latest_send_time_(0) {
-    content_ = new char[size];
-    if (content_) {
-      memset(content_, 0x0, size);
+ public:
+  SendBuffer(uint32_t size) : msg_cnt_(0), data_len_(0), inlong_group_id_(), 
inlong_stream_id_() {
+    data_ = new char[size];
+    if (data_) {
+      memset(data_, 0x0, size);
     }
   }
   ~SendBuffer() {
-    if (content_) {
-      delete[] content_;
+    if (data_) {
+      delete[] data_;
     }
   }
-
-  char *content() { return content_; }
-  int32_t msgCnt() const { return msg_cnt_; }
-  void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; }
-  uint32_t len() { return len_; }
-  void setLen(const uint32_t len) { len_ = len; }
-  std::string getGroupId() { return inlong_group_id_; }
-  std::string getStreamId() { return inlong_stream_id_; }
-  void setInlongGroupId(const std::string &inlong_group_id) {
+  char *GetData() const {
+    return data_;
+  }
+  void SetData(char *data) {
+    data_ = data;
+  }
+  uint32_t GetDataLen() const {
+    return data_len_;
+  }
+  void SetDataLen(uint32_t data_len) {
+    data_len_ = data_len;
+  }
+  uint32_t GetMsgCnt() const {
+    return msg_cnt_;
+  }
+  void SetMsgCnt(uint32_t msg_cnt) {
+    msg_cnt_ = msg_cnt;
+  }
+  const std::string &GetInlongGroupId() const {
+    return inlong_group_id_;
+  }
+  void SetInlongGroupId(const std::string &inlong_group_id) {
     inlong_group_id_ = inlong_group_id;
   }
-  void setStreamId(const std::string &inlong_stream_id) {
+  const std::string &GetInlongStreamId() const {
+    return inlong_stream_id_;
+  }
+  void SetInlongStreamId(const std::string &inlong_stream_id) {
     inlong_stream_id_ = inlong_stream_id;
   }
 
-  void setUniqId(const uint32_t &uniq_id) { uniq_id_ = uniq_id; }
+  void addUserMsg(const SdkMsgPtr &msg) { user_msg_vector_.push_back(msg); }
 
-  void addUserMsg(SdkMsgPtr msg) { user_msg_vector_.push_back(msg); }
   void doUserCallBack() {
     for (auto it : user_msg_vector_) {
       if (it->cb_) {
-        it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
-                it->msg_.data(), it->msg_.size(), it->user_report_time_,
-                it->user_client_ip_.data());
+        it->cb_(it->inlong_group_id_.data(),
+                it->inlong_stream_id_.data(),
+                it->msg_.data(),
+                it->msg_.size(),
+                it->report_time_,
+                it->client_ip_.data());
       }
     }
   }
 
   void releaseBuf() {
-    if (!is_used_) {
-      return;
-    }
-    uniq_id_ = 0;
-    is_used_ = false;
-    is_packed_ = false;
-    memset(content_, 0x0, size_);
     msg_cnt_ = 0;
-    len_ = 0;
+    data_len_ = 0;
     inlong_group_id_ = "";
     inlong_stream_id_ = "";
-    already_send_.getAndSet(0);
-    first_send_time_ = 0;
-    latest_send_time_ = 0;
+    for (const auto &it : user_msg_vector_) {
+      if (it->cb_) {
+        it->clear();
+      }
+    }
+    MsgManager::GetInstance()->AddMsg(user_msg_vector_);
     user_msg_vector_.clear();
-    AtomicInt fail_create_conn_;
-    fail_create_conn_.getAndSet(0);
+    user_msg_vector_.shrink_to_fit();
   }
-
-  void setIsPacked(bool is_packed) { is_packed_ = is_packed; }
 };
 typedef std::shared_ptr<SendBuffer> SendBufferPtrT;
-typedef std::queue<SendBufferPtrT> SendBufferPtrDeque;
-} // namespace inlong
+}  // namespace inlong
 
-#endif // INLONG_SDK_SEND_BUFFER_H
\ No newline at end of file
+#endif  // INLONG_SEND_BUFFER_H
\ No newline at end of file

Reply via email to