This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fb395fe0e0 [INLONG-10821][SDK] Optimize the ability to receive data 
for DataProxy C++ SDK (#10835)
fb395fe0e0 is described below

commit fb395fe0e0a44c1ade30f084c138aeaf7232e2bf
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Aug 21 14:01:34 2024 +0800

    [INLONG-10821][SDK] Optimize the ability to receive data for DataProxy C++ 
SDK (#10835)
---
 .../dataproxy-sdk-cpp/src/client/tcp_client.cc     | 246 +++++++-------
 .../dataproxy-sdk-cpp/src/client/tcp_client.h      |  71 ++--
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |   7 +
 .../dataproxy-sdk-cpp/src/config/sdk_conf.h        |   1 +
 .../dataproxy-sdk-cpp/src/core/api_code.h          |   3 +-
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          | 104 +++---
 .../dataproxy-sdk-cpp/src/core/api_imp.h           |  38 +--
 .../dataproxy-sdk-cpp/src/core/sdk_msg.h           |  11 +-
 .../dataproxy-sdk-cpp/src/group/recv_group.cc      | 365 +++++++++++----------
 .../dataproxy-sdk-cpp/src/group/recv_group.h       |  77 +++--
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc |   6 +-
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |   2 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   5 +
 .../dataproxy-sdk-cpp/src/utils/send_buffer.h      |   4 +-
 14 files changed, 511 insertions(+), 429 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index e27910c4e9..eb3264ba6c 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -1,47 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #include "tcp_client.h"
-#include "../utils/utils.h"
-#include "api_code.h"
+
 #include <utility>
 
+#include "../manager/buffer_manager.h"
+#include "../manager/metric_manager.h"
+#include "../utils/utils.h"
+
 namespace inlong {
 #define CLIENT_INFO client_info_ << "[" << status_ << "]"
 TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port)
     : socket_(std::make_shared<asio::ip::tcp::socket>(io_context)),
       wait_timer_(std::make_shared<asio::steady_timer>(io_context)),
       keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)),
-      ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), 
port),
-      status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false),
-      proxy_loads_(30), wait_heart_beat_(false), reset_client_(false),
-      heart_beat_index_(0), only_heart_heat_(false) {
+      ip_(ip),
+      port_(port),
+      endpoint_(asio::ip::address::from_string(ip), port),
+      status_(kUndefined),
+      recv_buf_(new BlockMemory()),
+      exit_(false),
+      proxy_loads_(30),
+      wait_heart_beat_(false),
+      reset_client_(false),
+      heart_beat_index_(0),
+      only_heart_heat_(false),
+      need_retry_(false),
+      retry_times_(0) {
   client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
 
   tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_;
   tcp_idle_time_ = SdkConfig::getInstance()->tcp_idle_time_;
   last_update_time_ = Utils::getCurrentMsTime();
 
-  keep_alive_timer_->expires_after(
-      std::chrono::milliseconds(tcp_detection_interval_));
-  keep_alive_timer_->async_wait(
-      std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
+  
keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_));
+  keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, 
std::placeholders::_1));
 
   LOG_INFO("TcpClient At remote info .status:" << status_ << client_info_);
   AsyncConnect();
@@ -70,7 +78,7 @@ TcpClient::~TcpClient() {
 void TcpClient::DoClose() {
   status_ = kStopped;
   exit_ = true;
-  LOG_INFO("closed client." << CLIENT_INFO);
+  LOG_INFO("Closed client." << CLIENT_INFO);
 }
 
 void TcpClient::AsyncConnect() {
@@ -87,13 +95,12 @@ void TcpClient::AsyncConnect() {
       }
     }
     status_ = kConnecting;
-    LOG_INFO("began to connect." << CLIENT_INFO);
+    LOG_INFO("Began to connect." << CLIENT_INFO);
   } catch (std::exception &e) {
     LOG_ERROR("AsyncConnect exception." << e.what() << CLIENT_INFO);
   }
 
-  socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this,
-                                              std::placeholders::_1));
+  socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this, 
std::placeholders::_1));
 }
 
 void TcpClient::DoAsyncConnect(asio::error_code error) {
@@ -116,7 +123,12 @@ void TcpClient::OnConnected(asio::error_code error) {
     socket_->set_option(asio::ip::tcp::no_delay(true));
     asio::socket_base::keep_alive option(true);
     socket_->set_option(option);
-    LOG_INFO("client has connected." << CLIENT_INFO);
+    LOG_INFO("Client has connected." << CLIENT_INFO);
+    if (need_retry_) {
+      LOG_WARN("Client has connected retry! times:" << retry_times_ << 
CLIENT_INFO);
+      write(sendBuffer_, true);
+      return;
+    }
     status_ = kFree;
     return;
   }
@@ -124,22 +136,24 @@ void TcpClient::OnConnected(asio::error_code error) {
     return;
   }
   status_ = kConnectFailed;
-  LOG_ERROR("connect has error:" << error.message() << CLIENT_INFO);
+  LOG_ERROR("Connect has error:" << error.message() << CLIENT_INFO);
+  if (need_retry_) {
+    ResetSendBuffer();
+  }
   wait_timer_->expires_after(std::chrono::milliseconds(kConnectTimeout));
-  wait_timer_->async_wait(
-      std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1));
+  wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, 
std::placeholders::_1));
 }
 
-void TcpClient::write(SendBufferPtrT sendBuffer) {
+void TcpClient::write(SendBufferPtrT sendBuffer, bool retry) {
   if (kStopped == status_ || exit_) {
     LOG_ERROR("Stop.At." << CLIENT_INFO);
     return;
   }
-  if (status_ != kFree) {
+  if (status_ != kFree && !retry) {
     LOG_WARN("Not free ." << CLIENT_INFO);
     return;
   }
-  sendBuffer_ = sendBuffer;
+  sendBuffer_ = std::move(sendBuffer);
   BeginWrite();
 }
 
@@ -150,13 +164,10 @@ void TcpClient::BeginWrite() {
   }
   last_update_time_ = Utils::getCurrentMsTime();
   status_ = kWriting;
-  asio::async_write(*socket_,
-                    asio::buffer(sendBuffer_->content(), sendBuffer_->len()),
-                    std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1,
-                              std::placeholders::_2));
+  asio::async_write(*socket_, asio::buffer(sendBuffer_->content(), 
sendBuffer_->len()),
+                    std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1, std::placeholders::_2));
 }
-void TcpClient::OnWroten(const asio::error_code error,
-                         std::size_t bytes_transferred) {
+void TcpClient::OnWroten(const asio::error_code error, std::size_t 
bytes_transferred) {
   if (kStopped == status_ || exit_) {
     return;
   }
@@ -164,14 +175,14 @@ void TcpClient::OnWroten(const asio::error_code error,
     if (asio::error::operation_aborted == error) {
       return;
     }
-    LOG_ERROR("write error:" << error.message() << CLIENT_INFO);
+    LOG_ERROR("Write error:" << error.message() << CLIENT_INFO);
     status_ = kWriting;
     HandleFail();
     return;
   }
 
   if (0 == bytes_transferred) {
-    LOG_ERROR("transferred 0 bytes." << CLIENT_INFO);
+    LOG_ERROR("Transferred 0 bytes." << CLIENT_INFO);
     status_ = kWaiting;
     HandleFail();
     return;
@@ -179,8 +190,7 @@ void TcpClient::OnWroten(const asio::error_code error,
 
   status_ = kClientResponse;
   asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)),
-                   std::bind(&TcpClient::OnReturn, this, std::placeholders::_1,
-                             std::placeholders::_2));
+                   std::bind(&TcpClient::OnReturn, this, 
std::placeholders::_1, std::placeholders::_2));
 }
 void TcpClient::OnReturn(asio::error_code error, std::size_t len) {
   if (kStopped == status_ || exit_) {
@@ -201,18 +211,15 @@ void TcpClient::OnReturn(asio::error_code error, 
std::size_t len) {
     HandleFail();
     return;
   }
-  size_t resp_len =
-      ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data));
+  size_t resp_len = ntohl(*reinterpret_cast<const uint32_t 
*>(recv_buf_->m_data));
 
   if (resp_len > recv_buf_->m_max_size) {
     status_ = kWaiting;
     HandleFail();
     return;
   }
-  asio::async_read(*socket_,
-                   asio::buffer(recv_buf_->m_data + sizeof(uint32_t), 
resp_len),
-                   std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
-                             std::placeholders::_2));
+  asio::async_read(*socket_, asio::buffer(recv_buf_->m_data + 
sizeof(uint32_t), resp_len),
+                   std::bind(&TcpClient::OnBody, this, std::placeholders::_1, 
std::placeholders::_2));
 }
 
 void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) {
@@ -230,17 +237,19 @@ void TcpClient::OnBody(asio::error_code error, size_t 
bytesTransferred) {
     return;
   }
   uint32_t parse_index = sizeof(uint32_t);
-  uint8_t msg_type =
-      *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+  uint8_t msg_type = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + 
parse_index);
 
   switch (msg_type) {
-  case 8:
-    ParseHeartBeat(bytesTransferred);
-    break;
-  default:
-    ParseGenericResponse();
-    break;
+    case 8:
+      ParseHeartBeat(bytesTransferred);
+      break;
+    default:
+      ParseGenericResponse();
+      break;
   }
+
+  ResetRetry();
+
   if (wait_heart_beat_) {
     HeartBeat();
     wait_heart_beat_ = false;
@@ -261,17 +270,12 @@ void TcpClient::HandleFail() {
   }
 
   status_ = kConnecting;
-  if (sendBuffer_ != nullptr) {
-    stat_.AddSendFailMsgNum(sendBuffer_->msgCnt());
-    stat_.AddSendFailPackNum(1);
+  ResetSendBuffer();
 
-    stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+  int retry_interval = std::min(retry_times_ * constants::kRetryIntervalMs, 
constants::kMaxRetryIntervalMs);
 
-    sendBuffer_->doUserCallBack();
-    sendBuffer_->releaseBuf();
-  }
-
-  AsyncConnect();
+  wait_timer_->expires_after(std::chrono::milliseconds(retry_interval));
+  wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, 
std::placeholders::_1));
 }
 
 void TcpClient::DetectStatus(const asio::error_code error) {
@@ -282,23 +286,28 @@ void TcpClient::DetectStatus(const asio::error_code 
error) {
     return;
   }
   if (!only_heart_heat_) {
-    LOG_INFO(stat_.ToString() << CLIENT_INFO);
-    stat_.ResetStat();
+    UpdateMetric();
   }
 
-  if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
-      status_ != kConnecting) {
+  if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && 
status_ != kConnecting) {
     std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
-    LOG_INFO("reconnect because it has idle "
-             << tcp_idle_time_ << " ms."
-             << "last send time:" << last_update_time_ << CLIENT_INFO);
+    LOG_INFO("Reconnect because it has idle " << tcp_idle_time_ << " ms."
+                                              << "last send time:" << 
last_update_time_ << CLIENT_INFO);
     AsyncConnect();
   }
 
-  keep_alive_timer_->expires_after(
-      std::chrono::milliseconds(tcp_detection_interval_));
-  keep_alive_timer_->async_wait(
-      std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
+  
keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_));
+  keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, 
std::placeholders::_1));
+}
+
+void TcpClient::UpdateMetric() {
+  Metric stat;
+  for (auto &it : stat_map_) {
+    MetricManager::GetInstance()->UpdateMetric(it.first, it.second);
+    stat.Update(it.second);
+    it.second.ResetStat();
+  }
+  LOG_INFO(stat.ToString() << CLIENT_INFO);
 }
 
 void TcpClient::HeartBeat(bool only_heart_heat) {
@@ -308,12 +317,10 @@ void TcpClient::HeartBeat(bool only_heart_heat) {
   only_heart_heat_ = only_heart_heat;
   status_ = kHeartBeat;
   last_update_time_ = Utils::getCurrentMsTime();
-  // status_ = kWriting;
 
   bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4);
   bin_hb_.msg_type = 8;
-  bin_hb_.data_time =
-      htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+  bin_hb_.data_time = htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 
1000));
   bin_hb_.body_ver = 1;
   bin_hb_.body_len = 0;
   bin_hb_.attr_len = 0;
@@ -322,58 +329,52 @@ void TcpClient::HeartBeat(bool only_heart_heat) {
   uint32_t hb_len = sizeof(bin_hb_);
 
   asio::async_write(*socket_, asio::buffer(hb, hb_len),
-                    std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1,
-                              std::placeholders::_2));
+                    std::bind(&TcpClient::OnWroten, this, 
std::placeholders::_1, std::placeholders::_2));
 }
 
 void TcpClient::ParseHeartBeat(size_t total_length) {
-  //  | total length(4) | msg type(1) | data time(4) | body version(1) | body
-  //  length (4) | body | attr length(2) | attr | magic (2) | skip total length
+  //  | total length(4) | msg type(1) | data time(4) | body version(1) | body 
length (4) | body | attr length(2) | attr
+  //  | magic (2) |
+  //  skip total length
   uint32_t parse_index = sizeof(uint32_t);
   //  skip msg type
   parse_index += sizeof(uint8_t);
   //  skip data time
-  //  uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
-  //  *>(recv_buf_->m_data + parse_index));
+  //  uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t 
*>(recv_buf_->m_data + parse_index));
   parse_index += sizeof(uint32_t);
 
   // 3、parse body version
-  uint32_t body_version =
-      *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+  uint32_t body_version = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data 
+ parse_index);
   parse_index += sizeof(uint8_t);
 
   // 4、parse body length
-  uint32_t body_length = ntohl(
-      *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index));
+  uint32_t body_length = ntohl(*reinterpret_cast<const uint32_t 
*>(recv_buf_->m_data + parse_index));
   parse_index += sizeof(uint32_t);
 
   // 5 parse load
-  int16_t load = ntohs(
-      *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index));
+  int16_t load = ntohs(*reinterpret_cast<const int16_t *>(recv_buf_->m_data + 
parse_index));
   parse_index += sizeof(int16_t);
 
   // 7 parse attr length
-  uint16_t attr_length = ntohs(
-      *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+  uint16_t attr_length = ntohs(*reinterpret_cast<const uint16_t 
*>(recv_buf_->m_data + parse_index));
   parse_index += sizeof(uint16_t);
 
   // 8 skip attr
   parse_index += attr_length;
 
   // 9 parse magic
-  uint16_t magic = ntohs(
-      *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+  uint16_t magic = ntohs(*reinterpret_cast<const uint16_t *>(recv_buf_->m_data 
+ parse_index));
   parse_index += sizeof(uint16_t);
 
   if (magic != constants::kBinaryMagic) {
-    LOG_ERROR("failed to parse heartbeat ack! error magic "
-              << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO);
+    LOG_ERROR("Failed to ParseMsg heartbeat ack! error magic " << magic << " 
!=" << constants::kBinaryMagic
+                                                               << CLIENT_INFO);
     return;
   }
 
   if (total_length + 4 != parse_index) {
-    LOG_ERROR("failed to parse heartbeat ack! total_length "
-              << total_length << " +4 !=" << parse_index << CLIENT_INFO);
+    LOG_ERROR("Failed to ParseMsg heartbeat ack! total_length " << 
total_length << " +4 !=" << parse_index
+                                                                << 
CLIENT_INFO);
     return;
   }
   if (heart_beat_index_ > constants::MAX_STAT) {
@@ -389,12 +390,12 @@ void TcpClient::ParseHeartBeat(size_t total_length) {
 
 void TcpClient::ParseGenericResponse() {
   if (sendBuffer_ != nullptr) {
-    stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
-    stat_.AddSendSuccessPackNum(1);
-
-    stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+    std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + 
sendBuffer_->getStreamId();
+    stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+    stat_map_[stat_key].AddSendSuccessPackNum(1);
+    stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - 
last_update_time_);
 
-    sendBuffer_->releaseBuf();
+    BufferManager::GetInstance()->AddSendBuffer(sendBuffer_);
   }
 }
 
@@ -419,8 +420,7 @@ int32_t TcpClient::GetAvgLoad() {
 void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; }
 
 void TcpClient::UpdateClient(const std::string ip, const uint32_t port) {
-  LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port
-                           << "] replace" << CLIENT_INFO);
+  LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port << 
"] replace" << CLIENT_INFO);
   ip_ = ip;
   port_ = port;
   reset_client_ = true;
@@ -438,5 +438,29 @@ void TcpClient::RestClient() {
 const std::string &TcpClient::getIp() const { return ip_; }
 const std::string &TcpClient::getClientInfo() const { return client_info_; }
 uint32_t TcpClient::getPort() const { return port_; }
+void TcpClient::ResetRetry() {
+  need_retry_ = false;
+  retry_times_ = 0;
+}
+void TcpClient::ResetSendBuffer() {
+  if (sendBuffer_ == nullptr) {
+    return;
+  }
+  retry_times_++;
+  if (retry_times_ > SdkConfig::getInstance()->retry_times_) {
+    std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + 
sendBuffer_->getStreamId();
+    stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt());
+    stat_map_[stat_key].AddSendFailPackNum(1);
+    stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - 
last_update_time_);
+
+    sendBuffer_->doUserCallBack();
+
+    BufferManager::GetInstance()->AddSendBuffer(sendBuffer_);
 
-} // namespace inlong
+    LOG_INFO("resend to proxy fail! retry times:" << retry_times_ << 
CLIENT_INFO);
+    ResetRetry();
+  } else {
+    need_retry_ = true;
+  }
+}
+}  // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
index 6835a7fd26..f2a92fc459 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-#ifndef INLONG_SDK_TCP_CLIENT_H
-#define INLONG_SDK_TCP_CLIENT_H
+#ifndef INLONG_TCP_CLIENT_H
+#define INLONG_TCP_CLIENT_H
 
+#include <queue>
+#include <unordered_map>
+
+#include "../metric/metric.h"
+#include "../protocol/msg_protocol.h"
 #include "../utils/block_memory.h"
 #include "../utils/capi_constant.h"
 #include "../utils/read_write_mutex.h"
 #include "../utils/send_buffer.h"
-#include "msg_protocol.h"
-#include "stat.h"
-#include <queue>
 
 namespace inlong {
 enum ClientStatus {
@@ -46,37 +46,38 @@ enum {
 };
 using IOContext = asio::io_context;
 using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
 class TcpClient {
-private:
+ private:
   TcpSocketPtr socket_;
   SteadyTimerPtr wait_timer_;
   SteadyTimerPtr keep_alive_timer_;
   ClientStatus status_;
   std::string ip_;
 
-public:
+ public:
   const std::string &getIp() const;
 
-private:
+ private:
   uint32_t port_;
 
-public:
+ public:
   uint32_t getPort() const;
 
-private:
+ private:
   std::string client_info_;
 
-public:
+ public:
   const std::string &getClientInfo() const;
 
-private:
+ private:
   std::shared_ptr<SendBuffer> sendBuffer_;
   asio::ip::tcp::endpoint endpoint_;
   BlockMemoryPtrT recv_buf_;
   uint64_t tcp_idle_time_;
   uint32_t tcp_detection_interval_;
   uint64_t last_update_time_;
-  Stat stat_;
+  Metric stat_;
   bool exit_;
   BinaryHB bin_hb_ = {0};
   std::vector<int32_t> proxy_loads_;
@@ -84,8 +85,11 @@ private:
   bool wait_heart_beat_;
   bool reset_client_;
   volatile bool only_heart_heat_;
+  bool need_retry_;
+  uint32_t retry_times_;
+  std::unordered_map <std::string ,Metric> stat_map_;
 
-public:
+ public:
   TcpClient(IOContext &io_context, std::string ip, uint32_t port);
   ~TcpClient();
   void AsyncConnect();
@@ -98,7 +102,7 @@ public:
   void DoClose();
   void HandleFail();
   bool isFree() { return (status_ == kFree); }
-  void write(SendBufferPtrT sendBuffer);
+  void write(SendBufferPtrT sendBuffer, bool retry = false);
   void DetectStatus(const asio::error_code error);
   void HeartBeat(bool only_heart_heat = false);
   void SetHeartBeatStatus();
@@ -107,10 +111,13 @@ public:
   void UpdateClient(const std::string ip, const uint32_t port);
   void RestClient();
   int32_t GetAvgLoad();
+  void ResetRetry();
+  void ResetSendBuffer();
+  void UpdateMetric();
 };
 typedef std::shared_ptr<TcpClient> TcpClientTPtrT;
 typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT;
 typedef TcpClientTPtrVecT::iterator TcpClientTPtrVecItT;
-} // namespace inlong
+}  // namespace inlong
 
-#endif // INLONG_SDK_TCP_CLIENT_H
+#endif  // INLONG_TCP_CLIENT_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 4240a836dc..12a82b07e7 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -436,6 +436,13 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
   } else {
     enable_balance_ = constants::kEnableBalance;
   }
+
+  if (doc.HasMember("retry_times") && doc["retry_times"].IsInt() && 
doc["retry_times"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["retry_times"];
+    retry_times_ = obj.GetInt();
+  } else {
+    retry_times_ = constants::kRetryTimes;
+  }
 }
 void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
   // auth settings
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index 3a649cfb82..ae6cfde58b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -95,6 +95,7 @@ private:
   uint32_t tcp_detection_interval_; // tcp-client detection interval
   bool enable_balance_;
   bool enable_local_cache_;
+  uint32_t retry_times_;
 
 
   // auth settings
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
index 0228f9c96d..f6413717dd 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h
@@ -41,7 +41,8 @@ enum SdkCode {
   kSendBeforeInit = 22,
   kFailMallocBuf = 23,
   kMsgSizeLargerThanPackSize = 24,
-  kSendBufferFull = 25
+  kSendBufferFull = 25,
+  kBufferManagerFull = 26
 };
 }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 13ce7223b5..7ae2c79ea2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -16,19 +16,20 @@
  */
 
 #include "api_imp.h"
+#include <signal.h>
+#include <iostream>
 #include "../manager/proxy_manager.h"
+#include "../manager/metric_manager.h"
 #include "../utils/capi_constant.h"
 #include "../utils/logger.h"
 #include "../utils/utils.h"
-
-#include "api_code.h"
-#include <iostream>
-#include <signal.h>
-
-#include "metric_manager.h"
+#include "../core/api_code.h"
 
 namespace inlong {
 int32_t ApiImp::InitApi(const char *config_file_path) {
+  if (config_file_path == nullptr) {
+    return SdkCode::kErrorInit;
+  }
   if (!__sync_bool_compare_and_swap(&inited_, false, true)) {
     return SdkCode::kMultiInit;
   }
@@ -36,56 +37,65 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
   user_exit_flag_.getAndSet(0);
 
   if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) {
-    LOG_ERROR("ParseConfig error ");
     return SdkCode::kErrorInit;
   }
-  max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_,
-                             SdkConfig::getInstance()->pack_size_) - 
constants::ATTR_LENGTH;
+
+  max_msg_length_ = SdkConfig::getInstance()->max_msg_size_ - 
constants::ATTR_LENGTH;
   local_ip_ = SdkConfig::getInstance()->local_ip_;
 
   return DoInit();
 }
 
-int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id,
-                     const char *msg, int32_t msg_len, UserCallBack call_back) 
{
+int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char 
*msg, int32_t msg_len,
+                     UserCallBack call_back) {
+  int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+  if(code !=SdkCode::kSuccess){
+    return code;
+  }
+
+  return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back);
+}
+int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char 
*msg, int32_t msg_len,
+                     int64_t data_time, UserCallBack call_back) {
+  int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
+  if(code !=SdkCode::kSuccess){
+    return code;
+  }
+
+  return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back, 
data_time);
+}
+
+int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, 
const char *msg, int32_t msg_len){
   if (msg_len > max_msg_length_) {
+    MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
     return SdkCode::kMsgTooLong;
   }
-  if (inlong_group_id == nullptr || inlong_stream_id == nullptr ||
-      msg == nullptr || msg_len <= 0) {
+  if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len 
<= 0) {
     return SdkCode::kInvalidInput;
   }
 
   if (inited_ == false) {
     return SdkCode::kSendBeforeInit;
   }
-
-  int64_t msg_time = Utils::getCurrentMsTime();
-  return this->SendBase(inlong_group_id, inlong_stream_id, local_ip_, msg_time,
-                        {msg, msg_len}, call_back);
+  return SdkCode::kSuccess;
 }
 
-int32_t ApiImp::SendBase(const std::string inlong_group_id,
-                         const std::string inlong_stream_id,
-                         const std::string client_ip, int64_t report_time,
-                         const std::string msg, UserCallBack call_back) {
-  int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg);
+int32_t ApiImp::SendBase(const std::string& inlong_group_id, const 
std::string& stream_id, const std::string& msg,
+                         UserCallBack call_back, int64_t report_time) {
+  int32_t check_ret = CheckData(inlong_group_id, stream_id, msg);
   if (check_ret != SdkCode::kSuccess) {
     return check_ret;
   }
 
-  ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true);
+  ProxyManager::GetInstance()->CheckGroupIdConf(inlong_group_id, true);
 
-  auto recv_group =
-      recv_manager_->GetRecvGroup(inlong_group_id);
+  auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id);
   if (recv_group == nullptr) {
-    LOG_ERROR("fail to get recv group, inlong_group_id:"
-              << inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
+    LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << 
",getStreamId:" << stream_id);
     return SdkCode::kFailGetRevGroup;
   }
 
-  return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, 
client_ip,
-                              report_time, call_back);
+  return recv_group->SendData(msg, inlong_group_id, stream_id, report_time, 
call_back);
 }
 
 int32_t ApiImp::CloseApi(int32_t max_waitms) {
@@ -98,44 +108,38 @@ int32_t ApiImp::CloseApi(int32_t max_waitms) {
 }
 
 int32_t ApiImp::DoInit() {
-  LOG_INFO(
-      "inlong dataproxy sdk cpp start Init, version:" << constants::kVersion);
+  LOG_INFO("tdbus sdk cpp start Init, version:" << constants::kVersion);
 
   signal(SIGPIPE, SIG_IGN);
 
-  LOG_INFO("inlong dataproxy cpp sdk Init complete!");
+  LOG_INFO("tdbus_sdk_cpp Init complete!");
 
   ProxyManager::GetInstance()->Init();
   MetricManager::GetInstance()->Init();
 
   for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) 
{
-    LOG_INFO("DoInit CheckConf inlong_group_id:"
-             << SdkConfig::getInstance()->inlong_group_ids_[i]);
-    ProxyManager::GetInstance()->CheckBidConf(
-        SdkConfig::getInstance()->inlong_group_ids_[i], false);
+    LOG_INFO("Do init:" << SdkConfig::getInstance()->inlong_group_ids_[i]);
+    
ProxyManager::GetInstance()->CheckGroupIdConf(SdkConfig::getInstance()->inlong_group_ids_[i],
 false);
   }
 
   return InitManager();
 }
 
-int32_t ApiImp::CheckData(const std::string inlong_group_id,
-                          const std::string inlong_stream_id,
-                          const std::string msg) {
+int32_t ApiImp::CheckData(const std::string& group_id, const std::string& 
stream_id, const std::string& msg) {
   if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
     LOG_ERROR("capi has been closed, Init first and then send");
     return SdkCode::kSendAfterClose;
   }
 
-  if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) {
-    LOG_ERROR("invalid input, inlong_group_id"
-              << inlong_group_id << " inlong_stream_id" << inlong_stream_id);
+  if (msg.empty() || group_id.empty() || stream_id.empty()) {
+    LOG_ERROR("invalid input, group id:" << group_id << " stream id:" << 
stream_id << "msg" << msg);
     return SdkCode::kInvalidInput;
   }
 
   if (msg.size() > SdkConfig::getInstance()->max_msg_size_) {
-    LOG_ERROR("msg len is too long, cur msg_len"
-              << msg.size() << " ext_pack_size"
-              << SdkConfig::getInstance()->max_msg_size_);
+    MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
+    LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " 
ext_pack_size"
+                                                     << 
SdkConfig::getInstance()->max_msg_size_);
     return SdkCode::kMsgTooLong;
   }
 
@@ -156,15 +160,15 @@ int32_t ApiImp::InitManager() {
   init_succeed_ = true;
   return SdkCode::kSuccess;
 }
-int32_t
-ApiImp::AddInLongGroupId(const std::vector<std::string> &inlong_group_ids) {
+int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) {
   if (inited_ == false) {
     return SdkCode::kSendBeforeInit;
   }
-  for (auto inlong_group_id : inlong_group_ids) {
-    ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, false);
+  for (auto group_id : group_ids) {
+    ProxyManager::GetInstance()->CheckGroupIdConf(group_id, false);
   }
+  return SdkCode::kSuccess;
 }
 ApiImp::ApiImp() = default;
 ApiImp::~ApiImp() = default;
-} // namespace inlong
\ No newline at end of file
+}  // namespace inlong
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index c6e6ecd0e4..32ca093416 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -15,45 +15,43 @@
  * limitations under the License.
  */
 
-#ifndef INLONG_SDK_API_IMP_H
-#define INLONG_SDK_API_IMP_H
+#ifndef INLONG_API_IMP_H
+#define INLONG_API_IMP_H
 
+#include <cstdint>
+#include <functional>
 #include "../config/sdk_conf.h"
 #include "../manager/recv_manager.h"
 #include "../manager/send_manager.h"
 #include "../utils/atomic.h"
-#include <cstdint>
-#include <functional>
 
 namespace inlong {
-
-typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
-                            const int64_t, const char *);
+typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, 
const int64_t, const char *);
 
 class ApiImp {
-public:
+ public:
   ApiImp();
   ~ApiImp();
   int32_t InitApi(const char *config_file_path);
 
-  int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
-               const char *msg, int32_t msg_len,
+  int32_t Send(const char *group_id, const char *stream_id, const char *msg, 
int32_t msg_len,
                UserCallBack call_back = nullptr);
 
+  int32_t Send(const char *group_id, const char *stream_id, const char *msg, 
int32_t msg_len,
+               int64_t report_time, UserCallBack call_back = nullptr);
   int32_t CloseApi(int32_t max_waitms);
 
-  int32_t AddInLongGroupId(const std::vector<std::string> &inlong_group_ids);
+  int32_t AddInLongGroupId(const std::vector<std::string> &group_ids);
 
-private:
+ private:
   int32_t DoInit();
   int32_t InitManager();
-  int32_t SendBase(const std::string inlong_group_id,
-                   const std::string inlong_stream_id,
-                   const std::string client_ip, int64_t report_time,
-                   const std::string msg, UserCallBack call_back);
+  int32_t SendBase(const std::string& inlong_group_id, const std::string& 
stream_id, const std::string& msg, UserCallBack call_back,
+                   int64_t report_time = 0);
+
+  int32_t CheckData(const std::string& group_id, const std::string& stream_id, 
const std::string& msg);
 
-  int32_t CheckData(const std::string inlong_group_id,
-                    const std::string inlong_stream_id, const std::string msg);
+  int32_t ValidateParams(const char *group_id, const char *stream_id, const 
char *msg, int32_t msg_len);
 
   AtomicInt user_exit_flag_{0};
   volatile bool init_flag_ = false;
@@ -67,5 +65,5 @@ private:
   std::shared_ptr<SendManager> send_manager_;
 };
 
-} // namespace inlong
-#endif // INLONG_SDK_API_IMP_H
+}  // namespace inlong
+#endif  // INLONG_API_IMP_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
index 8ae749a352..d9e8366486 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h
@@ -48,7 +48,16 @@ struct SdkMsg {
         data_pack_format_attr_(attr),
         inlong_group_id_(inlong_group_id),
         inlong_stream_id_(inlong_stream_id){}
-  SdkMsg() {};
+  SdkMsg(){};
+  void setMsg(const std::string &msg) { msg_ = msg; }
+  void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; }
+  void setReportTime(uint64_t reportTime) { report_time_ = reportTime; }
+  void setCb(UserCallBack cb) { cb_ = cb;}
+  void setUserReportTime(uint64_t userReportTime) { user_report_time_ = 
userReportTime; }
+  void setUserClientIp(const std::string &userClientIp) { user_client_ip_ = 
userClientIp; }
+  void setDataPackFormatAttr(const std::string &dataPackFormatAttr) { 
data_pack_format_attr_ = dataPackFormatAttr; }
+  void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ = 
inlong_group_id; }
+  void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = 
inlong_stream_id; }
 };
 using SdkMsgPtr = std::shared_ptr<SdkMsg>;
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index b852095f68..377432a2ef 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -1,49 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 #include "recv_group.h"
 
-#include "../protocol/msg_protocol.h"
-#include "../utils/capi_constant.h"
-#include "../utils/utils.h"
-#include "api_code.h"
-#include <cstdlib>
 #include <functional>
+#include "../core/api_code.h"
+#include "../manager/buffer_manager.h"
+#include "../utils/utils.h"
+#include "../manager/msg_manager.h"
+#include "../manager/metric_manager.h"
 
 namespace inlong {
 const uint32_t DEFAULT_PACK_ATTR = 400;
-const uint64_t LOG_SAMPLE=100;
-RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> 
send_manager)
-    : cur_len_(0), groupId_num_(0), streamId_num_(0),
+const uint64_t LOG_SAMPLE = 100;
+RecvGroup::RecvGroup(const std::string &group_key, 
std::shared_ptr<SendManager> send_manager)
+    : cur_len_(0),
+      group_key_(group_key),
+      groupId_num_(0),
+      streamId_num_(0),
       msg_type_(SdkConfig::getInstance()->msg_type_),
-      data_capacity_(SdkConfig::getInstance()->buf_size_),
-      send_manager_(send_manager),group_key_(group_key),
-      log_stat_(0){
-  data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
-                            SdkConfig::getInstance()->pack_size_);
+      data_capacity_(SdkConfig::getInstance()->recv_buf_size_),
+      send_manager_(send_manager),
+      log_stat_(0),
+      send_group_(nullptr),
+      max_msg_size_(0),
+      uniq_id_(0) {
+  data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, 
SdkConfig::getInstance()->pack_size_);
   data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
 
   pack_buf_ = new char[data_capacity_];
   memset(pack_buf_, 0x0, data_capacity_);
-  data_time_ = 0;
+  data_time_ = Utils::getCurrentMsTime();
   last_pack_time_ = Utils::getCurrentMsTime();
   max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
-
-  
LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_);
+  local_ip_ = SdkConfig::getInstance()->local_ip_;
+  LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ 
<< ",max_recv_size:" << max_recv_size_);
 }
 
 RecvGroup::~RecvGroup() {
@@ -53,136 +60,108 @@ RecvGroup::~RecvGroup() {
   }
 }
 
-int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId,
-                            const std::string &streamId,
-                            const std::string &client_ip, uint64_t report_time,
-                            UserCallBack call_back) {
+int32_t RecvGroup::SendData(const std::string &msg, const std::string 
&inlong_group_id_, const std::string &inlong_stream_id_,
+                            uint64_t report_time, UserCallBack call_back) {
   std::lock_guard<std::mutex> lck(mutex_);
-
   if (msg.size() + cur_len_ > max_recv_size_) {
+    
MetricManager::GetInstance()->AddReceiveBufferFullCount(inlong_group_id_,inlong_stream_id_,1);
     return SdkCode::kRecvBufferFull;
   }
 
-  AddMsg(msg, client_ip, report_time, call_back,groupId,streamId);
-
-  return SdkCode::kSuccess;
-}
+  uint64_t data_time = (report_time == 0) ? data_time_ : report_time;
 
-int32_t RecvGroup::DoDispatchMsg() {
-  last_pack_time_ = Utils::getCurrentMsTime();
-  std::lock_guard<std::mutex> lck(mutex_);
-  if (group_key_.empty()) {
-    if (log_stat_++ > LOG_SAMPLE) {
-      LOG_ERROR("groupId  is empty, check!!");
-      log_stat_ = 0;
-    }
-    return SdkCode::kInvalidInput;
-  }
-  if (msgs_.empty()) {
-    if (log_stat_++ > LOG_SAMPLE) {
-      LOG_ERROR("no msg in msg_set, check!");
-      log_stat_ = 0;
-    }
-    return SdkCode::kMsgEmpty;
-  }
-  auto send_group = send_manager_->GetSendGroup(group_key_);
-  if (send_group == nullptr) {
-    if (log_stat_++ > LOG_SAMPLE) {
-      LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
-      log_stat_ = 0;
-    }
-    return SdkCode::kFailGetSendBuf;
-  }
-  if (!send_group->IsAvailable()) {
-    if (log_stat_++ > LOG_SAMPLE) {
-      LOG_ERROR("failed to get send group! group_key:"
-                << group_key_ << " send group is not available!");
-      log_stat_ = 0;
-    }
-    return SdkCode::kFailGetConn;
+  std::string data_pack_format_attr =
+      "__addcol1__reptime=" + Utils::getFormatTime(data_time) + 
"&__addcol2__ip=" + local_ip_;
+  max_msg_size_ = std::max(max_msg_size_, msg.size());
+  auto it = recv_queue_.find(inlong_group_id_ + inlong_stream_id_);
+  if (it == recv_queue_.end()) {
+    std::queue<SdkMsgPtr> tmp;
+    it = recv_queue_.insert(recv_queue_.begin(), 
std::make_pair(inlong_group_id_ + inlong_stream_id_, tmp));
   }
-  if (send_group->IsFull()) {
-    if (log_stat_++ > LOG_SAMPLE) {
-      LOG_ERROR("failed to get send group! group_key:"
-                << group_key_ << " send group is full!");
-      log_stat_ = 0;
-    }
-    return SdkCode::kSendBufferFull;
+  SdkMsgPtr msg_ptr = MsgManager::GetInstance()->GetMsg();
+  if(nullptr == msg_ptr){
+    it->second.emplace(std::make_shared<SdkMsg>(msg, local_ip_, data_time, 
call_back, data_pack_format_attr, local_ip_, data_time, inlong_group_id_, 
inlong_stream_id_));
+  }else{
+    msg_ptr->setMsg(msg);
+    msg_ptr->setClientIp(local_ip_);
+    msg_ptr->setReportTime(data_time);
+    msg_ptr->setCb(call_back);
+    msg_ptr->setDataPackFormatAttr(data_pack_format_attr);
+    msg_ptr->setUserClientIp(local_ip_);
+    msg_ptr->setUserReportTime(data_time);
+    msg_ptr->setGroupId(inlong_group_id_);
+    msg_ptr->setStreamId(inlong_stream_id_);
+    it->second.emplace(msg_ptr);
   }
 
-  uint32_t total_length = 0;
-  uint64_t max_tid_size = 0;
-  std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch;
-  std::unordered_map<std::string, uint64_t> tid_stat;
-  while (!msgs_.empty()) {
-    SdkMsgPtr msg = msgs_.front();
-    if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > 
SdkConfig::getInstance()->pack_size_) {
-      if (!msgs_to_dispatch.empty()) {
-        break;
-      }
-    }
-    std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_;
-    msgs_to_dispatch[msg_key].push_back(msg);
-    msgs_.pop();
-
-    total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
+  cur_len_ = cur_len_ + msg.size() + constants::ATTR_LENGTH;
 
-    if (tid_stat.find(msg_key) == tid_stat.end()) {
-      tid_stat[msg_key] = 0;
-    }
-    tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + 
constants::ATTR_LENGTH;
+  return SdkCode::kSuccess;
+}
 
-    max_tid_size = std::max(tid_stat[msg_key], max_tid_size);
+void RecvGroup::DoDispatchMsg() {
+  if (!CanDispatch()) {
+    return;
   }
-
-  cur_len_ = cur_len_ - total_length;
-
-  for (auto it : msgs_to_dispatch) {
-    std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second);
-
-    ResetPackBuf();
-
-    if (send_buffer == nullptr) {
-      CallbalkToUsr(it.second);
-      continue;
-    }
-
-    int ret = send_group->PushData(send_buffer);
-    if (ret != SdkCode::kSuccess) {
-      CallbalkToUsr(it.second);
+  last_pack_time_ = Utils::getCurrentMsTime();
+  data_time_ = last_pack_time_;
+  while (!fail_queue_.empty()) {
+    SendBufferPtrT tmp_ptr = fail_queue_.front();
+    if (SdkCode::kSuccess != send_group_->PushData(tmp_ptr)) {
+      return;
     }
+    fail_queue_.pop();
   }
 
-  return SdkCode::kSuccess;
-}
-
-void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
-                       int64_t report_time, UserCallBack call_back,const 
std::string &groupId,
-                       const std::string &streamId) {
-  if (Utils::isLegalTime(report_time))
-    data_time_ = report_time;
-  else {
-    data_time_ = Utils::getCurrentMsTime();
+  {
+    std::lock_guard<std::mutex> lck(mutex_);
+    recv_queue_.swap(dispatch_queue_);
   }
 
-  std::string user_client_ip = client_ip;
-  int64_t user_report_time = report_time;
+  for (auto &it : dispatch_queue_) {
+    std::vector<SdkMsgPtr> msg_vec;
+    uint64_t msg_size = 0;
+    while (!it.second.empty()) {
+      SdkMsgPtr msg = it.second.front();
+      msg_vec.push_back(msg);
+      msg_size = msg_size + msg->msg_.size() + constants::ATTR_LENGTH;
+      it.second.pop();
+
+      if ((msg_size + max_msg_size_) >= SdkConfig::getInstance()->pack_size_) {
+        uint32_t ret = ParseMsg(msg_vec);
+        if (SdkCode::kBufferManagerFull == ret) {
+          for (const auto &it_msg : msg_vec) {
+            it.second.emplace(it_msg);
+          }
+          return;
+        }
+        UpdateCurrentMsgLen(msg_size);
+        msg_size = 0;
+
+        if (SdkCode::kSuccess != ret) {
+          return;
+        }
+        std::vector<SdkMsgPtr>().swap(msg_vec);
+      }
+    }
+    if (!msg_vec.empty()) {
+      uint32_t ret = ParseMsg(msg_vec);
+      if (SdkCode::kBufferManagerFull == ret) {
+        for (const auto &it_msg : msg_vec) {
+          it.second.emplace(it_msg);
+        }
+        return;
+      }
+      UpdateCurrentMsgLen(msg_size);
 
-  if (client_ip.empty()) {
-    client_ip = "127.0.0.1";
+      if (SdkCode::kSuccess != ret) {
+        return;
+      }
+    }
   }
-  std::string data_pack_format_attr =
-      "__addcol1__reptime=" + Utils::getFormatTime(data_time_) +
-      "&__addcol2__ip=" + client_ip;
-  msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back,
-                                      data_pack_format_attr, user_client_ip,
-                                      user_report_time,groupId,streamId));
-
-  cur_len_ += msg.size() + constants::ATTR_LENGTH;
 }
 
-bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
-                        uint32_t &out_len, uint32_t uniq_id) {
+bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t 
&out_len) {
   if (pack_data == nullptr) {
     LOG_ERROR("nullptr, failed to allocate memory for buf");
     return false;
@@ -196,7 +175,6 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
     memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size());
     idx += static_cast<uint32_t>(it->msg_.size());
 
-    // add attrlen|attr
     if (SdkConfig::getInstance()->isAttrDataPackFormat()) {
       *(uint32_t *)(&pack_buf_[idx]) = 
htonl(it->data_pack_format_attr_.size());
       idx += sizeof(uint32_t);
@@ -247,7 +225,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
       groupId_num = 0;
       streamId_num = 0;
       groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
-                              "&streamId=" + msgs[0]->inlong_stream_id_;
+          "&streamId=" + msgs[0]->inlong_stream_id_;
       char_groupId_flag = 0x4;
     } else {
       groupId_num = groupId_num_;
@@ -261,14 +239,14 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, 
char *pack_data,
     if (SdkConfig::getInstance()->enableTraceIP()) {
       if (groupId_streamId_char.empty())
         attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ +
-               "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+            "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
       else
         attr = groupId_streamId_char +
-               "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
-               "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+            "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
+            "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
     } else {
       attr = "groupId=" + msgs[0]->inlong_group_id_ +
-             "&streamId=" + msgs[0]->inlong_stream_id_;
+          "&streamId=" + msgs[0]->inlong_stream_id_;
     }
     *(uint16_t *)bodyBegin = htons(attr.size());
     bodyBegin += sizeof(uint16_t);
@@ -294,7 +272,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
     p += 4;
     *(uint16_t *)p = htons(cnt);
     p += 2;
-    *(uint32_t *)p = htonl(uniq_id);
+    *(uint32_t *)p = htonl(uniq_id_);
 
     out_len = total_len + 4;
   } else {
@@ -319,10 +297,10 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, 
char *pack_data,
     // attr
     std::string attr;
     attr = "groupId=" + msgs[0]->inlong_group_id_ +
-           "&streamId=" + msgs[0]->inlong_stream_id_;
+        "&streamId=" + msgs[0]->inlong_stream_id_;
 
     attr += "&dt=" + std::to_string(data_time_);
-    attr += "&mid=" + std::to_string(uniq_id);
+    attr += "&mid=" + std::to_string(uniq_id_);
     if (isSnappy)
       attr += "&cp=snappy";
     attr += "&cnt=" + std::to_string(cnt);
@@ -352,56 +330,97 @@ bool RecvGroup::IsZipAndOperate(std::string &res, 
uint32_t real_cur_len) {
 }
 
 void RecvGroup::DispatchMsg(bool exit) {
-  if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty())
-    return;
+  if (cur_len_ <= constants::ATTR_LENGTH) return;
   bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_;
-  bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
-                     SdkConfig::getInstance()->pack_timeout_;
+  bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) > 
SdkConfig::getInstance()->pack_timeout_;
   if (len_enough || time_enough) {
     DoDispatchMsg();
   }
 }
-std::shared_ptr<SendBuffer>
-RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
+std::shared_ptr<SendBuffer> RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> 
&msgs) {
   if (msgs.empty()) {
-    LOG_ERROR("pack msgs is empty.");
     return nullptr;
   }
-  std::shared_ptr<SendBuffer> send_buffer =
-      std::make_shared<SendBuffer>(data_capacity_);
+  std::shared_ptr<SendBuffer> send_buffer = 
BufferManager::GetInstance()->GetSendBuffer();
   if (send_buffer == nullptr) {
-    LOG_ERROR("make send buffer failed.");
     return nullptr;
   }
+
   uint32_t len = 0;
-  int32_t msg_cnt = msgs.size();
-  uint32_t uniq_id = g_send_msgid.incrementAndGet();
+  uint32_t msg_cnt = msgs.size();
+  if (++uniq_id_ >= constants::kMaxSnowFlake) {
+    uniq_id_ = 0;
+  }
 
-  if (!PackMsg(msgs, send_buffer->content(), len, uniq_id) || len == 0) {
-    LOG_ERROR("failed to write data to send buf from pack queue, sendQueue "
-              "id:%d, buf id:%d");
+  if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) {
+    LOG_ERROR("failed to write data to send buf from pack queue, sendQueue");
     return nullptr;
   }
+
   send_buffer->setLen(len);
   send_buffer->setMsgCnt(msg_cnt);
   send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
   send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
-  send_buffer->setUniqId(uniq_id);
-  send_buffer->setIsPacked(true);
-  for (auto it : msgs) {
-    send_buffer->addUserMsg(it);
+  for (const auto &it : msgs) {
+    if(it->cb_){
+      send_buffer->addUserMsg(it);
+    }
   }
 
   return send_buffer;
 }
 
-void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) {
-  for (auto &it : msgs) {
-    if (it->cb_) {
-      it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
-              it->msg_.data(), it->msg_.size(), it->user_report_time_,
-              it->user_client_ip_.data());
+uint32_t RecvGroup::ParseMsg(std::vector<SdkMsgPtr> &msg_vec) {
+  if (msg_vec.empty()) {
+    return SdkCode::kSuccess;
+  }
+
+  std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msg_vec);
+
+  if (send_buffer == nullptr) {
+    return SdkCode::kBufferManagerFull;
+  }
+
+  uint32_t ret = send_group_->PushData(send_buffer);
+  if (ret != SdkCode::kSuccess) {
+    fail_queue_.push(send_buffer);
+  }
+  return ret;
+}
+bool RecvGroup::CanDispatch() {
+  if (group_key_.empty()) {
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("Group key is empty!");
+      log_stat_ = 0;
+    }
+    return false;
+  }
+  if (nullptr == send_group_) {
+    send_group_ = send_manager_->GetSendGroup(group_key_);
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send group! group_key:" << group_key_);
+      log_stat_ = 0;
+    }
+    return false;
+  }
+  if (!send_group_->IsAvailable()) {
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is 
not available!");
+      log_stat_ = 0;
     }
+    return false;
   }
+  if (send_group_->IsFull()) {
+    if (log_stat_++ > LOG_SAMPLE) {
+      LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is 
full!");
+      log_stat_ = 0;
+    }
+    return false;
+  }
+  return true;
+}
+void RecvGroup::UpdateCurrentMsgLen(uint64_t msg_size) {
+  std::lock_guard<std::mutex> lck(mutex_);
+  cur_len_ = cur_len_ - msg_size;
 }
-} // namespace inlong
\ No newline at end of file
+}  // namespace inlong
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index adac9a7f10..839c49e14d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 #ifndef INLONG_SDK_RECV_GROUP_H
@@ -25,17 +27,19 @@
 #include <unordered_map>
 
 #include "../config/sdk_conf.h"
+#include "../core/sdk_msg.h"
 #include "../manager/send_manager.h"
 #include "../utils/atomic.h"
 #include "../utils/noncopyable.h"
 
 namespace inlong {
 class RecvGroup {
-private:
-  char *pack_buf_;
+ private:
+  char* pack_buf_;
   std::queue<SdkMsgPtr> msgs_;
+  std::queue<SdkMsgPtr> fail_msgs_;
   uint32_t data_capacity_;
-  uint32_t cur_len_;
+  uint64_t cur_len_;
   AtomicInt pack_err_;
   uint64_t data_time_;
   uint16_t groupId_num_;
@@ -47,33 +51,36 @@ private:
   uint64_t last_pack_time_;
 
   uint64_t max_recv_size_;
+
   std::string group_key_;
   uint64_t log_stat_;
+  SendGroupPtr send_group_;
+  std::string local_ip_;
+  uint64_t max_msg_size_;
+  uint64_t uniq_id_;
+
+  std::unordered_map<std::string, std::queue<SdkMsgPtr>> recv_queue_;
+  std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_;
+  std::queue<SendBufferPtrT> fail_queue_;
 
-  int32_t DoDispatchMsg();
-  void AddMsg(const std::string &msg, std::string client_ip,
-              int64_t report_time, UserCallBack call_back,const std::string 
&groupId,
-              const std::string &streamId);
-  bool IsZipAndOperate(std::string &res, uint32_t real_cur_len);
+  void DoDispatchMsg();
+  bool IsZipAndOperate(std::string& res, uint32_t real_cur_len);
   inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
+  uint32_t ParseMsg(std::vector<SdkMsgPtr>& msg_vec);
 
-public:
-  RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> 
send_manager);
+ public:
+  RecvGroup(const std::string& group_key, std::shared_ptr<SendManager> 
send_manager);
   ~RecvGroup();
 
-  int32_t SendData(const std::string &msg, const std::string &groupId,
-                   const std::string &streamId, const std::string &client_ip,
-                   uint64_t report_time, UserCallBack call_back);
-  bool PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, uint32_t 
&out_len,
-               uint32_t uniq_id);
+  int32_t SendData(const std::string& msg, const std::string& 
inlong_group_id_, const std::string& inlong_stream_id_, uint64_t report_time,
+                   UserCallBack call_back);
+  bool PackMsg(std::vector<SdkMsgPtr>& msgs, char* pack_data, uint32_t& 
out_len);
   void DispatchMsg(bool exit);
-
-  char *data() const { return pack_buf_; }
-
-  std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs);
-  void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs);
+  std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr>& msgs);
+  bool CanDispatch();
+  void UpdateCurrentMsgLen(uint64_t msg_size);
 };
 using RecvGroupPtr = std::shared_ptr<RecvGroup>;
-} // namespace inlong
+}  // namespace inlong
 
-#endif // INLONG_SDK_RECV_GROUP_H
\ No newline at end of file
+#endif  // INLONG_SDK_RECV_GROUP_H
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 09014d728d..1653d68de0 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -217,8 +217,8 @@ int32_t ProxyManager::GetProxy(const std::string &key,
   return GetProxyByClusterId(key, proxy_info_vec);
 }
 
-int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
-                                   bool is_inited) {
+int32_t ProxyManager::CheckGroupIdConf(const std::string &inlong_group_id,
+                                       bool is_inited) {
   {
     unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
     auto it = groupid_2_cluster_id_map_.find(inlong_group_id);
@@ -379,7 +379,7 @@ void ProxyManager::WriteLocalCache() {
   } catch (...) {
     LOG_ERROR("WriteLocalCache error!");
   }
-  LOG_INFO("WriteLocalCache bid number:" << groupid_count);
+  LOG_INFO("WriteLocalCache getGroupId number:" << groupid_count);
 }
 
 std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 3cd3e5491f..419bc60798 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -62,7 +62,7 @@ public:
     static ProxyManager instance;
     return &instance;
   }
-  int32_t CheckBidConf(const std::string &inlong_group_id, bool is_inited);
+  int32_t CheckGroupIdConf(const std::string &inlong_group_id, bool is_inited);
   void Update();
   void DoUpdate();
   void Init();
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 616f018b90..47282e0e32 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -22,6 +22,7 @@
 
 #include "string.h"
 #include <stdint.h>
+#include <limits>
 
 namespace inlong {
 namespace constants {
@@ -79,6 +80,9 @@ static const uint32_t kReserveProxyNum = 2;
 static const bool kEnableTCPNagle = true;
 static const uint32_t kTcpIdleTime = 600000;
 static const uint32_t kTcpDetectionInterval = 60000;
+static const uint32_t kMaxRetryIntervalMs= 3000;
+static const uint32_t kRetryIntervalMs= 200;
+static const int32_t kRetryTimes = 1;
 
 static const char kSerIP[] = "127.0.0.1";
 static const uint32_t kSerPort = 46801;
@@ -87,6 +91,7 @@ static const uint32_t kMsgType = 7;
 static const bool kEnableSetAffinity = false;
 static const uint32_t kMaskCPUAffinity = 0xff;
 static const uint16_t kExtendField = 0;
+static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max();
 
 // http basic auth
 static const char kBasicAuthHeader[] = "Authorization:";
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 1f2970b17d..5b7e5a596a 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -72,8 +72,8 @@ public:
   void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; }
   uint32_t len() { return len_; }
   void setLen(const uint32_t len) { len_ = len; }
-  std::string bid() { return inlong_group_id_; }
-  std::string tid() { return inlong_stream_id_; }
+  std::string getGroupId() { return inlong_group_id_; }
+  std::string getStreamId() { return inlong_stream_id_; }
   void setInlongGroupId(const std::string &inlong_group_id) {
     inlong_group_id_ = inlong_group_id;
   }

Reply via email to