This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ab3f2cc416 [INLONG-9293][SDK] Optimize the problem that the more inlong grouids there are, the more memory is consumed. (#9295) ab3f2cc416 is described below commit ab3f2cc4162dfe633d5dabf33f0f4c828c0d2e9d Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Nov 16 09:40:20 2023 +0800 [INLONG-9293][SDK] Optimize the problem that the more inlong grouids there are, the more memory is consumed. (#9295) --- .../dataproxy-sdk-cpp/release/inc/api_code.h | 2 +- .../dataproxy-sdk-cpp/release/inc/sdk_msg.h | 10 ++- .../dataproxy-sdk-cpp/src/core/api_imp.cc | 2 +- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 93 ++++++++++++---------- .../dataproxy-sdk-cpp/src/group/recv_group.h | 13 +-- .../dataproxy-sdk-cpp/src/group/send_group.cc | 4 +- .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 2 +- .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 2 +- .../dataproxy-sdk-cpp/src/manager/recv_manager.cc | 19 +++-- .../dataproxy-sdk-cpp/src/manager/recv_manager.h | 2 +- .../dataproxy-sdk-cpp/src/manager/send_manager.cc | 10 +-- .../dataproxy-sdk-cpp/src/utils/send_buffer.h | 2 +- 12 files changed, 87 insertions(+), 74 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h index 227f6f9e24..0228f9c96d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h @@ -31,7 +31,7 @@ enum SdkCode { kInvalidBid = 12, kFailGetBufferPool = 13, kFailGetSendBuf = 14, - kFailWriteToBuf = 15, + kMsgEmpty = 15, kErrorCURL = 16, kErrorParseJson = 17, kFailGetRevGroup = 18, diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h index ae77b7efcf..df7e90f4ac 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h @@ -36,12 +36,18 @@ struct SdkMsg { std::string user_client_ip_; std::string data_pack_format_attr_; + + std::string inlong_group_id_; + std::string inlong_stream_id_; + SdkMsg(const std::string &mmsg, const std::string &mclient_ip, int64_t mreport_time, UserCallBack mcb, const std::string &attr, - const std::string &u_ip, int64_t u_time) + const std::string &u_ip, int64_t u_time,const std::string& inlong_group_id,const std::string& inlong_stream_id) : msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time), cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip), - data_pack_format_attr_(attr) {} + data_pack_format_attr_(attr), + inlong_group_id_(inlong_group_id), + inlong_stream_id_(inlong_stream_id){} }; using SdkMsgPtr = std::shared_ptr<SdkMsg>; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index fd5b1b8312..f058142841 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -75,7 +75,7 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id, ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true); auto recv_group = - recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id); + recv_manager_->GetRecvGroup(inlong_group_id); if (recv_group == nullptr) { LOG_ERROR("fail to get recv group, inlong_group_id:" << inlong_group_id << " inlong_stream_id:" << inlong_stream_id); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index 9d41942e97..79931bd434 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -26,25 +26,22 @@ namespace inlong { const uint32_t DEFAULT_PACK_ATTR = 400; -RecvGroup::RecvGroup(const std::string &inlong_group_id, - const std::string &inlong_stream_id, - std::shared_ptr<SendManager> send_manager) - : cur_len_(0), inlong_group_id_(inlong_group_id), - inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0), +RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager) + : cur_len_(0), groupId_num_(0), streamId_num_(0), msg_type_(SdkConfig::getInstance()->msg_type_), data_capacity_(SdkConfig::getInstance()->buf_size_), - send_manager_(send_manager) { + send_manager_(send_manager),group_key_(group_key) { data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR; pack_buf_ = new char[data_capacity_]; memset(pack_buf_, 0x0, data_capacity_); - topic_desc_ = - "groupId=" + inlong_group_id_ + "&streamId=" + inlong_stream_id_; data_time_ = 0; last_pack_time_ = Utils::getCurrentMsTime(); max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_; + + LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_); } RecvGroup::~RecvGroup() { @@ -64,7 +61,7 @@ int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId, return SdkCode::kRecvBufferFull; } - AddMsg(msg, client_ip, report_time, call_back); + AddMsg(msg, client_ip, report_time, call_back,groupId,streamId); return SdkCode::kSuccess; } @@ -72,15 +69,15 @@ int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId, int32_t RecvGroup::DoDispatchMsg() { last_pack_time_ = Utils::getCurrentMsTime(); std::lock_guard<std::mutex> lck(mutex_); - if (inlong_group_id_.empty()) { + if (group_key_.empty()) { LOG_ERROR("groupId is empty, check!!"); return SdkCode::kInvalidInput; } if (msgs_.empty()) { LOG_ERROR("no msg in msg_set, check!"); - return SdkCode::kFailGetRevGroup; + return SdkCode::kMsgEmpty; } - auto send_group = send_manager_->GetSendGroup(inlong_group_id_); + auto send_group = send_manager_->GetSendGroup(group_key_); if (send_group == nullptr) { LOG_ERROR("failed to get send_buf, something gets wrong, checkout!"); return SdkCode::kFailGetSendBuf; @@ -93,38 +90,54 @@ int32_t RecvGroup::DoDispatchMsg() { } uint32_t total_length = 0; - std::vector<SdkMsgPtr> msgs_to_dispatch; + uint64_t max_tid_size = 0; + std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch; + std::unordered_map<std::string, uint64_t> tid_stat; while (!msgs_.empty()) { SdkMsgPtr msg = msgs_.front(); - if (msg->msg_.size() + total_length + constants::ATTR_LENGTH > - SdkConfig::getInstance()->pack_size_) { - break; + if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > SdkConfig::getInstance()->pack_size_) { + if (!msgs_to_dispatch.empty()) { + break; + } } - msgs_to_dispatch.push_back(msg); + std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_; + msgs_to_dispatch[msg_key].push_back(msg); msgs_.pop(); + total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH; + + if (tid_stat.find(msg_key) == tid_stat.end()) { + tid_stat[msg_key] = 0; + } + tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + constants::ATTR_LENGTH; + + max_tid_size = std::max(tid_stat[msg_key], max_tid_size); } cur_len_ = cur_len_ - total_length; - std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msgs_to_dispatch); + for (auto it : msgs_to_dispatch) { + std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second); - ResetPackBuf(); + ResetPackBuf(); - if (send_buffer == nullptr) { - CallbalkToUsr(msgs_to_dispatch); - return SdkCode::kSuccess; - } + if (send_buffer == nullptr) { + CallbalkToUsr(it.second); + continue; + } - int ret = send_group->PushData(send_buffer); - if (ret != SdkCode::kSuccess) { - CallbalkToUsr(msgs_to_dispatch); + int ret = send_group->PushData(send_buffer); + if (ret != SdkCode::kSuccess) { + CallbalkToUsr(it.second); + } } + return SdkCode::kSuccess; } void RecvGroup::AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back) { + int64_t report_time, UserCallBack call_back,const std::string &groupId, + const std::string &streamId) { if (Utils::isLegalTime(report_time)) data_time_ = report_time; else { @@ -142,19 +155,11 @@ void RecvGroup::AddMsg(const std::string &msg, std::string client_ip, "&__addcol2__ip=" + client_ip; msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back, data_pack_format_attr, user_client_ip, - user_report_time)); + user_report_time,groupId,streamId)); cur_len_ += msg.size() + constants::ATTR_LENGTH; } -bool RecvGroup::ShouldPack(int32_t msg_len) { - if (0 == cur_len_ || msgs_.empty()) - return false; - if (msg_len + cur_len_ > SdkConfig::getInstance()->pack_size_) - return true; - return false; -} - bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, uint32_t &out_len, uint32_t uniq_id) { if (pack_data == nullptr) { @@ -220,7 +225,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, streamId_num_ == 0) { groupId_num = 0; streamId_num = 0; - groupId_streamId_char = topic_desc_; + groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ + + "&streamId=" + msgs[0]->inlong_stream_id_; char_groupId_flag = 0x4; } else { groupId_num = groupId_num_; @@ -240,7 +246,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, "&node1ip=" + SdkConfig::getInstance()->local_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); } else { - attr = topic_desc_; + attr = "groupId=" + msgs[0]->inlong_group_id_ + + "&streamId=" + msgs[0]->inlong_stream_id_; } *(uint16_t *)bodyBegin = htons(attr.size()); bodyBegin += sizeof(uint16_t); @@ -290,7 +297,9 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, // attr std::string attr; - attr = topic_desc_; + attr = "groupId=" + msgs[0]->inlong_group_id_ + + "&streamId=" + msgs[0]->inlong_stream_id_; + attr += "&dt=" + std::to_string(data_time_); attr += "&mid=" + std::to_string(uniq_id); if (isSnappy) @@ -354,8 +363,8 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) { } send_buffer->setLen(len); send_buffer->setMsgCnt(msg_cnt); - send_buffer->setInlongGroupId(inlong_group_id_); - send_buffer->setStreamId(inlong_stream_id_); + send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_); + send_buffer->setStreamId(msgs[0]->inlong_stream_id_); send_buffer->setUniqId(uniq_id); send_buffer->setIsPacked(true); for (auto it : msgs) { @@ -368,7 +377,7 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) { void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) { for (auto &it : msgs) { if (it->cb_) { - it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(), + it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), it->msg_.data(), it->msg_.size(), it->user_report_time_, it->user_client_ip_.data()); } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h index f9150f3602..2d1a9e6315 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h @@ -38,11 +38,8 @@ private: uint32_t cur_len_; AtomicInt pack_err_; uint64_t data_time_; - std::string inlong_group_id_; - std::string inlong_stream_id_; uint16_t groupId_num_; uint16_t streamId_num_; - std::string topic_desc_; uint32_t msg_type_; mutable std::mutex mutex_; @@ -50,17 +47,17 @@ private: uint64_t last_pack_time_; uint64_t max_recv_size_; + std::string group_key_; - bool ShouldPack(int32_t msg_len); int32_t DoDispatchMsg(); void AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back); + int64_t report_time, UserCallBack call_back,const std::string &groupId, + const std::string &streamId); bool IsZipAndOperate(std::string &res, uint32_t real_cur_len); inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); } public: - RecvGroup(const std::string &groupId, const std::string &streamId, - std::shared_ptr<SendManager> send_manager); + RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager); ~RecvGroup(); int32_t SendData(const std::string &msg, const std::string &groupId, @@ -72,8 +69,6 @@ public: char *data() const { return pack_buf_; } - std::string groupId() const { return inlong_group_id_; } - std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs); void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs); }; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc index 4591953fa0..f317163799 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc @@ -34,8 +34,8 @@ SendGroup::SendGroup(std::string send_group_key) if (max_send_queue_num_ <= 0) { max_send_queue_num_ = kDefaultQueueSize; } - LOG_INFO("SendGroup: " << send_group_key_ - << ", max send queue num: " << max_send_queue_num_); + LOG_INFO("SendGroup:" << send_group_key_ + << ",max send queue num:" << max_send_queue_num_); dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_; load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_; heart_beat_interval_ = diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index f2e887d29c..084905e2c9 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -295,7 +295,7 @@ int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id, proxy_info_vec = it->second; return SdkCode::kSuccess; } -std::string ProxyManager::GetSendGroupKey(const std::string &groupid) { +std::string ProxyManager::GetGroupKey(const std::string &groupid) { if (SdkConfig::getInstance()->enable_isolation_) { return groupid; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 18856f61ec..653185798c 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -66,7 +66,7 @@ public: int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec); int32_t GetProxyByClusterId(const std::string &cluster_id, ProxyInfoVec &proxy_info_vec); - std::string GetSendGroupKey(const std::string &groupid); + std::string GetGroupKey(const std::string &groupid); bool HasProxy(const std::string &inlong_group_id); bool CheckGroupid(const std::string &groupid); bool CheckClusterId(const std::string &cluster_id); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc index 077dd4464b..789e98e7e6 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc @@ -17,6 +17,7 @@ #include "recv_manager.h" #include "../utils/utils.h" +#include "proxy_manager.h" namespace inlong { RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager) @@ -24,8 +25,9 @@ RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager) exit_flag_(false) { dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_; - max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ * - SdkConfig::getInstance()->max_stream_id_num_; + max_groupid_streamid_num_ = + std::max(SdkConfig::getInstance()->max_group_id_num_, + SdkConfig::getInstance()->max_stream_id_num_); LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_); check_timer_ = std::make_shared<asio::steady_timer>(io_context_); @@ -54,10 +56,13 @@ RecvManager::~RecvManager() { } } void RecvManager::Run() { io_context_.run(); } -RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId, - const std::string &streamId) { +RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId) { std::lock_guard<std::mutex> lck(mutex_); - auto it = recv_group_map_.find(groupId + streamId); + std::string group_key = ProxyManager::GetInstance()->GetGroupKey(groupId); + if (group_key.empty()) { + return nullptr; + } + auto it = recv_group_map_.find(group_key); if (it != recv_group_map_.end()) { return it->second; } else { @@ -66,8 +71,8 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId, } RecvGroupPtr recv_group = - std::make_shared<RecvGroup>(groupId, streamId, send_manager_); - recv_group_map_.emplace(groupId + streamId, recv_group); + std::make_shared<RecvGroup>(group_key, send_manager_); + recv_group_map_.emplace(group_key, recv_group); return recv_group; } } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h index 14de991ac4..4a69d2939a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h @@ -52,7 +52,7 @@ public: RecvManager(std::shared_ptr<SendManager> send_manager); ~RecvManager(); void DispatchData(std::error_code error); - RecvGroupPtr GetRecvGroup(const std::string &bid, const std::string &tid); + RecvGroupPtr GetRecvGroup(const std::string &bid); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc index 475fe14b3f..6104411b9a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc @@ -28,18 +28,16 @@ SendManager::SendManager() : send_group_idx_(0) { << SdkConfig::getInstance()->inlong_group_ids_[i] << " send group num:" << SdkConfig::getInstance()->per_groupid_thread_nums_); - std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey( + std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey( SdkConfig::getInstance()->inlong_group_ids_[i]); AddSendGroup(send_group_key); } } -SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) { - std::string send_group_key = - ProxyManager::GetInstance()->GetSendGroupKey(group_id); - SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key); +SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) { + SendGroupPtr send_group_ptr = DoGetSendGroup(group_key); if (send_group_ptr == nullptr) { - AddSendGroup(send_group_key); + AddSendGroup(group_key); } return send_group_ptr; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h index 66e93d67ce..1f2970b17d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h @@ -87,7 +87,7 @@ public: void doUserCallBack() { for (auto it : user_msg_vector_) { if (it->cb_) { - it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(), + it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), it->msg_.data(), it->msg_.size(), it->user_report_time_, it->user_client_ip_.data()); }