This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f01d49347e [INLONG-10780][SDK] Optimize memory management for 
DataProxy CPP SDK (#10792)
f01d49347e is described below

commit f01d49347e6932dea5ae95b37111a5f9bdde0244
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Wed Aug 14 16:33:32 2024 +0800

    [INLONG-10780][SDK] Optimize memory management for DataProxy CPP SDK 
(#10792)
---
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       | 46 +++++++++---
 .../dataproxy-sdk-cpp/src/config/sdk_conf.h        |  3 +
 .../dataproxy-sdk-cpp/src/manager/buffer_manager.h | 85 ++++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |  2 +
 4 files changed, 126 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 319262adc2..68cc122c4c 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -38,6 +38,9 @@ bool SdkConfig::ParseConfig(const std::string &config_path) {
   // Guaranteed to only parse the configuration file once
   if (!__sync_bool_compare_and_swap(&parsed_, false, true)) {
     LOG_INFO("ParseConfig has  parsed .");
+    if (++instance_num_ > max_instance_) {
+      return false;
+    }
     return true;
   }
 
@@ -92,7 +95,7 @@ void SdkConfig::defaultInit() {
   load_balance_interval_ = constants::kLoadBalanceInterval;
   heart_beat_interval_ = constants::kHeartBeatInterval;
   enable_balance_ = constants::kEnableBalance;
-  isolation_level_=constants::IsolationLevel::kLevelSecond;
+  isolation_level_ = constants::IsolationLevel::kLevelSecond;
 
   // cache parameter
   send_buf_size_ = constants::kSendBufSize;
@@ -132,6 +135,10 @@ void SdkConfig::defaultInit() {
   enable_setaffinity_ = constants::kEnableSetAffinity;
   mask_cpu_affinity_ = constants::kMaskCPUAffinity;
   extend_field_ = constants::kExtendField;
+
+  need_auth_ = constants::kNeedAuth;
+  max_instance_ = constants::kMaxInstance;
+  instance_num_ = 1;
 }
 
 void SdkConfig::InitThreadParam(const rapidjson::Value &doc) {
@@ -212,6 +219,14 @@ void SdkConfig::InitCacheParam(const rapidjson::Value 
&doc) {
   } else {
     max_stream_id_num_ = constants::kMaxGroupIdNum;
   }
+
+  // max_cache_num
+  if (doc.HasMember("max_cache_num") && doc["max_cache_num"].IsInt() && 
doc["max_cache_num"].GetInt() >= 0) {
+    const rapidjson::Value &obj = doc["max_cache_num"];
+    max_cache_num_ = obj.GetInt();
+  } else {
+    max_cache_num_ = constants::kMaxCacheNum;
+  }
 }
 
 void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
@@ -431,9 +446,10 @@ void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
   } else {
     need_auth_ = constants::kNeedAuth;
     LOG_INFO("need_auth is not expect, then use default:%s" << need_auth_
-                 ? "true"
-                 : "false");
+             ? "true"
+             : "false");
   }
+
 }
 void SdkConfig::OthersParam(const rapidjson::Value &doc) {
   // ser_ip
@@ -475,12 +491,20 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
   } else {
     extend_field_ = constants::kExtendField;
   }
+
+  // instance num
+  if (doc.HasMember("max_instance") && doc["max_instance"].IsInt() && 
doc["max_instance"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["max_instance"];
+    max_instance_ = obj.GetInt();
+  } else {
+    max_instance_ = constants::kMaxInstance;
+  }
 }
 
-bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& 
localhost) {
+bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string 
&localhost) {
   int32_t sockfd;
   int32_t ip_num = 0;
-  char  buf[1024] = {0};
+  char buf[1024] = {0};
   struct ifreq *ifreq;
   struct ifreq if_flag;
   struct ifconf ifconf;
@@ -493,7 +517,7 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, 
std::string& localhos
   }
 
   ioctl(sockfd, SIOCGIFCONF, &ifconf);
-  ifreq  = (struct ifreq *)buf;
+  ifreq = (struct ifreq *) buf;
   ip_num = ifconf.ifc_len / sizeof(struct ifreq);
   for (int32_t i = 0; i < ip_num; i++, ifreq++) {
     if (ifreq->ifr_flags != AF_INET) {
@@ -511,11 +535,11 @@ bool SdkConfig::GetLocalIPV4Address(std::string& 
err_info, std::string& localhos
       continue;
     }
 
-    if (!strncmp(inet_ntoa(((struct 
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
+    if (!strncmp(inet_ntoa(((struct sockaddr_in *) 
&(ifreq->ifr_addr))->sin_addr),
                  "127.0.0.1", 7)) {
       continue;
     }
-    localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+    localhost = inet_ntoa(((struct sockaddr_in *) 
&(ifreq->ifr_addr))->sin_addr);
     close(sockfd);
     err_info = "Ok";
     return true;
@@ -545,8 +569,8 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
   LOG_INFO(
       "enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
-          ? "true"
-          : "false");
+      ? "true"
+      : "false");
   LOG_INFO("manager_update_interval:  minutes" << manager_update_interval_);
   LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
   LOG_INFO("max_tcp_num: " << max_proxy_num_);
@@ -566,6 +590,8 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("max_group_id_num: " << max_group_id_num_);
   LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
   LOG_INFO("isolation_level: " << isolation_level_);
+  LOG_INFO("max_instance: " << max_instance_);
+  LOG_INFO("max_cache_num: " << max_cache_num_);
 }
 
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index 6d7b23dc21..f8581a1198 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -50,6 +50,9 @@ private:
   uint32_t send_buf_size_; // Send buf size, bid granularity
   uint32_t max_group_id_num_; // Send buf size, bid granularity
   uint32_t max_stream_id_num_; // Send buf size, bid granularity
+  uint32_t max_cache_num_;
+  uint32_t max_instance_;
+  uint32_t instance_num_;
 
   // thread parameters
   uint32_t per_groupid_thread_nums_; // Sending thread per groupid
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
new file mode 100644
index 0000000000..428188e38e
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <queue>
+
+#include "../config/sdk_conf.h"
+#include "../utils/send_buffer.h"
+
+#ifndef INLONG_BUFFER_MANAGER_H
+#define INLONG_BUFFER_MANAGER_H
+
+namespace inlong {
+class BufferManager {
+ private:
+  std::queue<SendBufferPtrT> buffer_queue_;
+  mutable std::mutex mutex_;
+  uint32_t queue_limit_;
+  BufferManager() {
+    uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
+                                       SdkConfig::getInstance()->pack_size_);
+    uint32_t buffer_num =
+        (SdkConfig::getInstance()->recv_buf_size_ / data_capacity_) *
+            SdkConfig::getInstance()->instance_num_;
+    queue_limit_ =
+        std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num);
+    LOG_INFO("Data capacity:"
+                 << data_capacity_ << ", buffer num: " << buffer_num
+                 << ", instance num: " << 
SdkConfig::getInstance()->instance_num_
+                 << ", limit: " << queue_limit_ << " ,max cache num: "
+                 << SdkConfig::getInstance()->max_cache_num_);
+    for ( uint32_t index = 0; index < queue_limit_; index++) {
+      std::shared_ptr<SendBuffer> send_buffer =
+          std::make_shared<SendBuffer>(data_capacity_);
+      if (send_buffer == nullptr) {
+        LOG_INFO("Buffer manager is null");
+        continue;
+      }
+      AddSendBuffer(send_buffer);
+    }
+  }
+
+ public:
+  static BufferManager *GetInstance() {
+    static BufferManager instance;
+    return &instance;
+  }
+  SendBufferPtrT GetSendBuffer() {
+    std::lock_guard<std::mutex> lck(mutex_);
+    if (buffer_queue_.empty()) {
+      return nullptr;
+    }
+    SendBufferPtrT buf = buffer_queue_.front();
+    buffer_queue_.pop();
+    return buf;
+  }
+  void AddSendBuffer(const SendBufferPtrT &send_buffer) {
+    if (nullptr == send_buffer) {
+      return;
+    }
+    send_buffer->releaseBuf();
+    std::lock_guard<std::mutex> lck(mutex_);
+    if (buffer_queue_.size() > queue_limit_) {
+      return;
+    }
+    buffer_queue_.emplace(send_buffer);
+  }
+};
+} // namespace inlong
+#endif // INLONG_BUFFER_MANAGER_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 1dbebd03db..399bd1b348 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -41,6 +41,8 @@ static const int32_t kSendBufSize = 10240000;
 static const int32_t kRecvBufSize = 10240000;
 static const uint32_t kMaxGroupIdNum = 50;
 static const uint32_t kMaxStreamIdNum = 100;
+static const uint32_t kMaxCacheNum = 10;
+static const uint32_t kMaxInstance = 30;
 
 static const int32_t kDispatchIntervalZip = 8;
 static const int32_t kDispatchIntervalSend = 10;

Reply via email to