This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab3f2cc416 [INLONG-9293][SDK] Optimize the problem that the more 
inlong grouids there are, the more memory is consumed. (#9295)
ab3f2cc416 is described below

commit ab3f2cc4162dfe633d5dabf33f0f4c828c0d2e9d
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Nov 16 09:40:20 2023 +0800

    [INLONG-9293][SDK] Optimize the problem that the more inlong grouids there 
are, the more memory is consumed. (#9295)
---
 .../dataproxy-sdk-cpp/release/inc/api_code.h       |  2 +-
 .../dataproxy-sdk-cpp/release/inc/sdk_msg.h        | 10 ++-
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          |  2 +-
 .../dataproxy-sdk-cpp/src/group/recv_group.cc      | 93 ++++++++++++----------
 .../dataproxy-sdk-cpp/src/group/recv_group.h       | 13 +--
 .../dataproxy-sdk-cpp/src/group/send_group.cc      |  4 +-
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc |  2 +-
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |  2 +-
 .../dataproxy-sdk-cpp/src/manager/recv_manager.cc  | 19 +++--
 .../dataproxy-sdk-cpp/src/manager/recv_manager.h   |  2 +-
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  | 10 +--
 .../dataproxy-sdk-cpp/src/utils/send_buffer.h      |  2 +-
 12 files changed, 87 insertions(+), 74 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
index 227f6f9e24..0228f9c96d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
@@ -31,7 +31,7 @@ enum SdkCode {
   kInvalidBid = 12,
   kFailGetBufferPool = 13,
   kFailGetSendBuf = 14,
-  kFailWriteToBuf = 15,
+  kMsgEmpty = 15,
   kErrorCURL = 16,
   kErrorParseJson = 17,
   kFailGetRevGroup = 18,
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
index ae77b7efcf..df7e90f4ac 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
@@ -36,12 +36,18 @@ struct SdkMsg {
   std::string user_client_ip_;
 
   std::string data_pack_format_attr_;
+
+  std::string inlong_group_id_;
+  std::string inlong_stream_id_;
+
   SdkMsg(const std::string &mmsg, const std::string &mclient_ip,
          int64_t mreport_time, UserCallBack mcb, const std::string &attr,
-         const std::string &u_ip, int64_t u_time)
+         const std::string &u_ip, int64_t u_time,const std::string& 
inlong_group_id,const std::string& inlong_stream_id)
       : msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time),
         cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip),
-        data_pack_format_attr_(attr) {}
+        data_pack_format_attr_(attr),
+        inlong_group_id_(inlong_group_id),
+        inlong_stream_id_(inlong_stream_id){}
 };
 using SdkMsgPtr = std::shared_ptr<SdkMsg>;
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index fd5b1b8312..f058142841 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -75,7 +75,7 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
   ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true);
 
   auto recv_group =
-      recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
+      recv_manager_->GetRecvGroup(inlong_group_id);
   if (recv_group == nullptr) {
     LOG_ERROR("fail to get recv group, inlong_group_id:"
               << inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 9d41942e97..79931bd434 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -26,25 +26,22 @@
 
 namespace inlong {
 const uint32_t DEFAULT_PACK_ATTR = 400;
-RecvGroup::RecvGroup(const std::string &inlong_group_id,
-                     const std::string &inlong_stream_id,
-                     std::shared_ptr<SendManager> send_manager)
-    : cur_len_(0), inlong_group_id_(inlong_group_id),
-      inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0),
+RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> 
send_manager)
+    : cur_len_(0), groupId_num_(0), streamId_num_(0),
       msg_type_(SdkConfig::getInstance()->msg_type_),
       data_capacity_(SdkConfig::getInstance()->buf_size_),
-      send_manager_(send_manager) {
+      send_manager_(send_manager),group_key_(group_key) {
   data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
                             SdkConfig::getInstance()->pack_size_);
   data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
 
   pack_buf_ = new char[data_capacity_];
   memset(pack_buf_, 0x0, data_capacity_);
-  topic_desc_ =
-      "groupId=" + inlong_group_id_ + "&streamId=" + inlong_stream_id_;
   data_time_ = 0;
   last_pack_time_ = Utils::getCurrentMsTime();
   max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
+
+  
LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_);
 }
 
 RecvGroup::~RecvGroup() {
@@ -64,7 +61,7 @@ int32_t RecvGroup::SendData(const std::string &msg, const 
std::string &groupId,
     return SdkCode::kRecvBufferFull;
   }
 
-  AddMsg(msg, client_ip, report_time, call_back);
+  AddMsg(msg, client_ip, report_time, call_back,groupId,streamId);
 
   return SdkCode::kSuccess;
 }
@@ -72,15 +69,15 @@ int32_t RecvGroup::SendData(const std::string &msg, const 
std::string &groupId,
 int32_t RecvGroup::DoDispatchMsg() {
   last_pack_time_ = Utils::getCurrentMsTime();
   std::lock_guard<std::mutex> lck(mutex_);
-  if (inlong_group_id_.empty()) {
+  if (group_key_.empty()) {
     LOG_ERROR("groupId  is empty, check!!");
     return SdkCode::kInvalidInput;
   }
   if (msgs_.empty()) {
     LOG_ERROR("no msg in msg_set, check!");
-    return SdkCode::kFailGetRevGroup;
+    return SdkCode::kMsgEmpty;
   }
-  auto send_group = send_manager_->GetSendGroup(inlong_group_id_);
+  auto send_group = send_manager_->GetSendGroup(group_key_);
   if (send_group == nullptr) {
     LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
     return SdkCode::kFailGetSendBuf;
@@ -93,38 +90,54 @@ int32_t RecvGroup::DoDispatchMsg() {
   }
 
   uint32_t total_length = 0;
-  std::vector<SdkMsgPtr> msgs_to_dispatch;
+  uint64_t max_tid_size = 0;
+  std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch;
+  std::unordered_map<std::string, uint64_t> tid_stat;
   while (!msgs_.empty()) {
     SdkMsgPtr msg = msgs_.front();
-    if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
-        SdkConfig::getInstance()->pack_size_) {
-      break;
+    if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > 
SdkConfig::getInstance()->pack_size_) {
+      if (!msgs_to_dispatch.empty()) {
+        break;
+      }
     }
-    msgs_to_dispatch.push_back(msg);
+    std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_;
+    msgs_to_dispatch[msg_key].push_back(msg);
     msgs_.pop();
+
     total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
+
+    if (tid_stat.find(msg_key) == tid_stat.end()) {
+      tid_stat[msg_key] = 0;
+    }
+    tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + 
constants::ATTR_LENGTH;
+
+    max_tid_size = std::max(tid_stat[msg_key], max_tid_size);
   }
 
   cur_len_ = cur_len_ - total_length;
 
-  std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msgs_to_dispatch);
+  for (auto it : msgs_to_dispatch) {
+    std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second);
 
-  ResetPackBuf();
+    ResetPackBuf();
 
-  if (send_buffer == nullptr) {
-    CallbalkToUsr(msgs_to_dispatch);
-    return SdkCode::kSuccess;
-  }
+    if (send_buffer == nullptr) {
+      CallbalkToUsr(it.second);
+      continue;
+    }
 
-  int ret = send_group->PushData(send_buffer);
-  if (ret != SdkCode::kSuccess) {
-    CallbalkToUsr(msgs_to_dispatch);
+    int ret = send_group->PushData(send_buffer);
+    if (ret != SdkCode::kSuccess) {
+      CallbalkToUsr(it.second);
+    }
   }
+
   return SdkCode::kSuccess;
 }
 
 void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
-                       int64_t report_time, UserCallBack call_back) {
+                       int64_t report_time, UserCallBack call_back,const 
std::string &groupId,
+                       const std::string &streamId) {
   if (Utils::isLegalTime(report_time))
     data_time_ = report_time;
   else {
@@ -142,19 +155,11 @@ void RecvGroup::AddMsg(const std::string &msg, 
std::string client_ip,
       "&__addcol2__ip=" + client_ip;
   msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back,
                                       data_pack_format_attr, user_client_ip,
-                                      user_report_time));
+                                      user_report_time,groupId,streamId));
 
   cur_len_ += msg.size() + constants::ATTR_LENGTH;
 }
 
-bool RecvGroup::ShouldPack(int32_t msg_len) {
-  if (0 == cur_len_ || msgs_.empty())
-    return false;
-  if (msg_len + cur_len_ > SdkConfig::getInstance()->pack_size_)
-    return true;
-  return false;
-}
-
 bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
                         uint32_t &out_len, uint32_t uniq_id) {
   if (pack_data == nullptr) {
@@ -220,7 +225,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
         streamId_num_ == 0) {
       groupId_num = 0;
       streamId_num = 0;
-      groupId_streamId_char = topic_desc_;
+      groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
+                              "&streamId=" + msgs[0]->inlong_stream_id_;
       char_groupId_flag = 0x4;
     } else {
       groupId_num = groupId_num_;
@@ -240,7 +246,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
                "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
                "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
     } else {
-      attr = topic_desc_;
+      attr = "groupId=" + msgs[0]->inlong_group_id_ +
+             "&streamId=" + msgs[0]->inlong_stream_id_;
     }
     *(uint16_t *)bodyBegin = htons(attr.size());
     bodyBegin += sizeof(uint16_t);
@@ -290,7 +297,9 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,
 
     // attr
     std::string attr;
-    attr = topic_desc_;
+    attr = "groupId=" + msgs[0]->inlong_group_id_ +
+           "&streamId=" + msgs[0]->inlong_stream_id_;
+
     attr += "&dt=" + std::to_string(data_time_);
     attr += "&mid=" + std::to_string(uniq_id);
     if (isSnappy)
@@ -354,8 +363,8 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
   }
   send_buffer->setLen(len);
   send_buffer->setMsgCnt(msg_cnt);
-  send_buffer->setInlongGroupId(inlong_group_id_);
-  send_buffer->setStreamId(inlong_stream_id_);
+  send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
+  send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
   send_buffer->setUniqId(uniq_id);
   send_buffer->setIsPacked(true);
   for (auto it : msgs) {
@@ -368,7 +377,7 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
 void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) {
   for (auto &it : msgs) {
     if (it->cb_) {
-      it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(),
+      it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
               it->msg_.data(), it->msg_.size(), it->user_report_time_,
               it->user_client_ip_.data());
     }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index f9150f3602..2d1a9e6315 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -38,11 +38,8 @@ private:
   uint32_t cur_len_;
   AtomicInt pack_err_;
   uint64_t data_time_;
-  std::string inlong_group_id_;
-  std::string inlong_stream_id_;
   uint16_t groupId_num_;
   uint16_t streamId_num_;
-  std::string topic_desc_;
   uint32_t msg_type_;
   mutable std::mutex mutex_;
 
@@ -50,17 +47,17 @@ private:
   uint64_t last_pack_time_;
 
   uint64_t max_recv_size_;
+  std::string group_key_;
 
-  bool ShouldPack(int32_t msg_len);
   int32_t DoDispatchMsg();
   void AddMsg(const std::string &msg, std::string client_ip,
-              int64_t report_time, UserCallBack call_back);
+              int64_t report_time, UserCallBack call_back,const std::string 
&groupId,
+              const std::string &streamId);
   bool IsZipAndOperate(std::string &res, uint32_t real_cur_len);
   inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
 
 public:
-  RecvGroup(const std::string &groupId, const std::string &streamId,
-            std::shared_ptr<SendManager> send_manager);
+  RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> 
send_manager);
   ~RecvGroup();
 
   int32_t SendData(const std::string &msg, const std::string &groupId,
@@ -72,8 +69,6 @@ public:
 
   char *data() const { return pack_buf_; }
 
-  std::string groupId() const { return inlong_group_id_; }
-
   std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs);
   void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs);
 };
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index 4591953fa0..f317163799 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -34,8 +34,8 @@ SendGroup::SendGroup(std::string send_group_key)
   if (max_send_queue_num_ <= 0) {
     max_send_queue_num_ = kDefaultQueueSize;
   }
-  LOG_INFO("SendGroup: " << send_group_key_
-                         << ", max send queue num: " << max_send_queue_num_);
+  LOG_INFO("SendGroup:" << send_group_key_
+                         << ",max send queue num:" << max_send_queue_num_);
   dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
   load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
   heart_beat_interval_ =
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index f2e887d29c..084905e2c9 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -295,7 +295,7 @@ int32_t ProxyManager::GetProxyByClusterId(const std::string 
&cluster_id,
   proxy_info_vec = it->second;
   return SdkCode::kSuccess;
 }
-std::string ProxyManager::GetSendGroupKey(const std::string &groupid) {
+std::string ProxyManager::GetGroupKey(const std::string &groupid) {
   if (SdkConfig::getInstance()->enable_isolation_) {
     return groupid;
   }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 18856f61ec..653185798c 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -66,7 +66,7 @@ public:
   int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec 
&proxy_info_vec);
   int32_t GetProxyByClusterId(const std::string &cluster_id,
                               ProxyInfoVec &proxy_info_vec);
-  std::string GetSendGroupKey(const std::string &groupid);
+  std::string GetGroupKey(const std::string &groupid);
   bool HasProxy(const std::string &inlong_group_id);
   bool CheckGroupid(const std::string &groupid);
   bool CheckClusterId(const std::string &cluster_id);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
index 077dd4464b..789e98e7e6 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
@@ -17,6 +17,7 @@
 
 #include "recv_manager.h"
 #include "../utils/utils.h"
+#include "proxy_manager.h"
 
 namespace inlong {
 RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager)
@@ -24,8 +25,9 @@ RecvManager::RecvManager(std::shared_ptr<SendManager> 
send_manager)
       exit_flag_(false) {
   dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;
 
-  max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
-                              SdkConfig::getInstance()->max_stream_id_num_;
+  max_groupid_streamid_num_ =
+      std::max(SdkConfig::getInstance()->max_group_id_num_,
+               SdkConfig::getInstance()->max_stream_id_num_);
   LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);
 
   check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
@@ -54,10 +56,13 @@ RecvManager::~RecvManager() {
   }
 }
 void RecvManager::Run() { io_context_.run(); }
-RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId,
-                                       const std::string &streamId) {
+RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId) {
   std::lock_guard<std::mutex> lck(mutex_);
-  auto it = recv_group_map_.find(groupId + streamId);
+  std::string group_key = ProxyManager::GetInstance()->GetGroupKey(groupId);
+  if (group_key.empty()) {
+    return nullptr;
+  }
+  auto it = recv_group_map_.find(group_key);
   if (it != recv_group_map_.end()) {
     return it->second;
   } else {
@@ -66,8 +71,8 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string 
&groupId,
     }
 
     RecvGroupPtr recv_group =
-        std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
-    recv_group_map_.emplace(groupId + streamId, recv_group);
+        std::make_shared<RecvGroup>(group_key, send_manager_);
+    recv_group_map_.emplace(group_key, recv_group);
     return recv_group;
   }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
index 14de991ac4..4a69d2939a 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
@@ -52,7 +52,7 @@ public:
   RecvManager(std::shared_ptr<SendManager> send_manager);
   ~RecvManager();
   void DispatchData(std::error_code error);
-  RecvGroupPtr GetRecvGroup(const std::string &bid, const std::string &tid);
+  RecvGroupPtr GetRecvGroup(const std::string &bid);
 };
 } // namespace inlong
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 475fe14b3f..6104411b9a 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -28,18 +28,16 @@ SendManager::SendManager() : send_group_idx_(0) {
              << SdkConfig::getInstance()->inlong_group_ids_[i]
              << " send group num:"
              << SdkConfig::getInstance()->per_groupid_thread_nums_);
-    std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey(
+    std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey(
         SdkConfig::getInstance()->inlong_group_ids_[i]);
     AddSendGroup(send_group_key);
   }
 }
 
-SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
-  std::string send_group_key =
-      ProxyManager::GetInstance()->GetSendGroupKey(group_id);
-  SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key);
+SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) {
+  SendGroupPtr send_group_ptr = DoGetSendGroup(group_key);
   if (send_group_ptr == nullptr) {
-    AddSendGroup(send_group_key);
+    AddSendGroup(group_key);
   }
   return send_group_ptr;
 }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 66e93d67ce..1f2970b17d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -87,7 +87,7 @@ public:
   void doUserCallBack() {
     for (auto it : user_msg_vector_) {
       if (it->cb_) {
-        it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(),
+        it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
                 it->msg_.data(), it->msg_.size(), it->user_report_time_,
                 it->user_client_ip_.data());
       }

Reply via email to