This is an automated email from the ASF dual-hosted git repository.

vinner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73881b4b12 [INLONG-10861][SDK] Optimize the coredump caused by the 
DataProxy C++ SDK (#10862)
73881b4b12 is described below

commit 73881b4b12ffe4939e9b14bd155f576398b77f22
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Fri Aug 23 10:08:18 2024 +0800

    [INLONG-10861][SDK] Optimize the coredump caused by the DataProxy C++ SDK 
(#10862)
---
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          | 23 +++++-------
 .../dataproxy-sdk-cpp/src/core/api_imp.h           |  5 ++-
 .../dataproxy-sdk-cpp/src/manager/buffer_manager.h | 11 +++++-
 .../dataproxy-sdk-cpp/src/manager/metric_manager.h | 18 +++++++---
 .../dataproxy-sdk-cpp/src/manager/msg_manager.h    | 12 +++++--
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc |  4 +++
 .../dataproxy-sdk-cpp/src/utils/utils.cc           | 42 ++++++----------------
 7 files changed, 58 insertions(+), 57 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 2109af502e..a9522eee69 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -34,8 +34,6 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
     return SdkCode::kMultiInit;
   }
 
-  user_exit_flag_.getAndSet(0);
-
   if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) {
     return SdkCode::kErrorInit;
   }
@@ -48,6 +46,9 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
 
 int32_t ApiImp::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
                      UserCallBack call_back) {
+  if (inited_ == false || exit_flag_) {
+    return SdkCode::kSendBeforeInit;
+  }
   int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
   if(code !=SdkCode::kSuccess){
     return code;
@@ -57,6 +58,9 @@ int32_t ApiImp::Send(const char *inlong_group_id, const char 
*inlong_stream_id,
 }
 int32_t ApiImp::Send(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len,
                      int64_t data_time, UserCallBack call_back) {
+  if (inited_ == false || exit_flag_) {
+    return SdkCode::kSendBeforeInit;
+  }
   int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
   if(code !=SdkCode::kSuccess){
     return code;
@@ -73,10 +77,6 @@ int32_t ApiImp::ValidateParams(const char *inlong_group_id, 
const char *inlong_s
   if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == 
nullptr || msg_len <= 0) {
     return SdkCode::kInvalidInput;
   }
-
-  if (inited_ == false) {
-    return SdkCode::kSendBeforeInit;
-  }
   return SdkCode::kSuccess;
 }
 
@@ -99,10 +99,7 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id, 
const std::string&
 }
 
 int32_t ApiImp::CloseApi(int32_t max_waitms) {
-  if (!__sync_bool_compare_and_swap(&init_flag_, false, true)) {
-    LOG_ERROR("sdk has been closed! .");
-    return SdkCode::kMultiExits;
-  }
+  exit_flag_ = true;
   std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
   return SdkCode::kSuccess;
 }
@@ -126,8 +123,7 @@ int32_t ApiImp::DoInit() {
 }
 
 int32_t ApiImp::CheckData(const std::string& inlong_group_id, const 
std::string& inlong_stream_id, const std::string& msg) {
-  if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
-    LOG_ERROR("capi has been closed, Init first and then send");
+  if (!init_succeed_) {
     return SdkCode::kSendAfterClose;
   }
 
@@ -154,14 +150,13 @@ int32_t ApiImp::InitManager() {
 
   recv_manager_ = std::make_shared<RecvManager>(send_manager_);
   if (!recv_manager_) {
-    LOG_ERROR("fail to Init global packqueue");
     return SdkCode::kErrorInit;
   }
   init_succeed_ = true;
   return SdkCode::kSuccess;
 }
 int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) {
-  if (inited_ == false) {
+  if (!inited_) {
     return SdkCode::kSendBeforeInit;
   }
   for (auto group_id : group_ids) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index cd6a9757a4..9671d8fc2b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -53,11 +53,10 @@ class ApiImp {
 
   int32_t ValidateParams(const char *inlong_group_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len);
 
-  AtomicInt user_exit_flag_{0};
-  volatile bool init_flag_ = false;
   volatile bool inited_ = false;
   volatile bool init_succeed_ = false;
-  AtomicInt buf_full_{0};
+  volatile bool exit_flag_ = false;
+
   uint32_t max_msg_length_;
   std::string local_ip_;
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
index 428188e38e..ed0c253baa 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
@@ -31,6 +31,7 @@ class BufferManager {
   std::queue<SendBufferPtrT> buffer_queue_;
   mutable std::mutex mutex_;
   uint32_t queue_limit_;
+  bool exit_= false;
   BufferManager() {
     uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
                                        SdkConfig::getInstance()->pack_size_);
@@ -54,6 +55,11 @@ class BufferManager {
       AddSendBuffer(send_buffer);
     }
   }
+  ~BufferManager(){
+    std::lock_guard<std::mutex> lck(mutex_);
+    exit_ = true;
+    LOG_INFO("Buffer manager exited");
+  }
 
  public:
   static BufferManager *GetInstance() {
@@ -61,6 +67,9 @@ class BufferManager {
     return &instance;
   }
   SendBufferPtrT GetSendBuffer() {
+    if(exit_){
+      return nullptr;
+    }
     std::lock_guard<std::mutex> lck(mutex_);
     if (buffer_queue_.empty()) {
       return nullptr;
@@ -70,7 +79,7 @@ class BufferManager {
     return buf;
   }
   void AddSendBuffer(const SendBufferPtrT &send_buffer) {
-    if (nullptr == send_buffer) {
+    if (nullptr == send_buffer || exit_) {
       return;
     }
     send_buffer->releaseBuf();
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
index 5dc013f2d5..58b9481e0f 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
@@ -24,6 +24,7 @@
 #include "../config/sdk_conf.h"
 #include "../metric/environment.h"
 #include "../metric/metric.h"
+#include "../utils/logger.h"
 
 #ifndef INLONG_METRIC_MANAGER_H
 #define INLONG_METRIC_MANAGER_H
@@ -40,10 +41,6 @@ class MetricManager {
   Environment environment_;
   std::string coreParma_;
 
-  MetricManager() {
-
-  }
-
  public:
   static MetricManager *GetInstance() {
     static MetricManager instance;
@@ -54,23 +51,35 @@ class MetricManager {
   void PrintMetric();
   void Run();
   void UpdateMetric(const std::string &stat_key, Metric &stat) {
+    if(!running_){
+      return;
+    }
     std::lock_guard<std::mutex> lck(mutex_);
     stat_map_[stat_key].Update(stat);
   }
 
   void AddReceiveBufferFullCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    if(!running_){
+      return;
+    }
     std::lock_guard<std::mutex> lck(mutex_);
     std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
     stat_map_[stat_key].AddReceiveBufferFullCount(count);
   }
 
   void AddTooLongMsgCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    if (!running_) {
+      return;
+    }
     std::lock_guard<std::mutex> lck(mutex_);
     std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
     stat_map_[stat_key].AddTooLongMsgCount(count);
   }
 
   void AddMetadataFailCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    if (!running_) {
+      return;
+    }
     std::lock_guard<std::mutex> lck(mutex_);
     std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
     stat_map_[stat_key].AddMetadataFailCount(count);
@@ -87,6 +96,7 @@ class MetricManager {
     if (update_thread_.joinable()) {
       update_thread_.join();
     }
+    LOG_INFO("Metric manager exited");
   }
 };
 }  // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
index 26aa387083..c304ead157 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
@@ -32,6 +32,7 @@ class MsgManager {
   mutable std::mutex mutex_;
   uint32_t queue_limit_;
   bool enable_share_msg_;
+  bool exit_= false;
   MsgManager() {
     uint32_t data_capacity_ = 
std::max(SdkConfig::getInstance()->max_msg_size_, 
SdkConfig::getInstance()->pack_size_);
     uint32_t buffer_num = SdkConfig::getInstance()->recv_buf_size_ / 
data_capacity_;
@@ -48,6 +49,11 @@ class MsgManager {
       AddMsg(msg_ptr);
     }
   }
+  ~MsgManager(){
+    std::lock_guard<std::mutex> lck(mutex_);
+    exit_ = true;
+    LOG_INFO("Msg manager exited");
+  }
 
  public:
   static MsgManager *GetInstance() {
@@ -55,7 +61,7 @@ class MsgManager {
     return &instance;
   }
   SdkMsgPtr GetMsg() {
-    if (!enable_share_msg_) {
+    if (!enable_share_msg_ || exit_) {
       return nullptr;
     }
     std::lock_guard<std::mutex> lck(mutex_);
@@ -67,7 +73,7 @@ class MsgManager {
     return buf;
   }
   void AddMsg(const SdkMsgPtr &msg_ptr) {
-    if (nullptr == msg_ptr || !enable_share_msg_) {
+    if (nullptr == msg_ptr || !enable_share_msg_ || exit_) {
       return;
     }
     std::lock_guard<std::mutex> lck(mutex_);
@@ -78,7 +84,7 @@ class MsgManager {
   }
 
   void AddMsg(const std::vector<SdkMsgPtr> &user_msg_vector) {
-    if (!enable_share_msg_) {
+    if (!enable_share_msg_ || exit_) {
       return;
     }
     std::lock_guard<std::mutex> lck(mutex_);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 3db331aeda..c84435ea89 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -21,6 +21,7 @@
 
 #include "api_code.h"
 #include <fstream>
+#include <curl/curl.h>
 
 #include "../config/ini_help.h"
 #include "../utils/capi_constant.h"
@@ -41,11 +42,14 @@ ProxyManager::~ProxyManager() {
   if (update_conf_thread_.joinable()) {
     update_conf_thread_.join();
   }
+
+  curl_global_cleanup();
 }
 void ProxyManager::Init() {
   timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
   last_update_time_ = Utils::getCurrentMsTime();
   if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+    curl_global_init(CURL_GLOBAL_ALL);
     ReadLocalCache();
     update_conf_thread_ = std::thread(&ProxyManager::Update, this);
   }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
index b3856b440b..7fe5ba4dc5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
@@ -309,7 +309,6 @@ int32_t Utils::requestUrl(const std::string &url, 
std::string &urlByDNS,
   }
 
   CURL *curl = NULL;
-  curl_global_init(CURL_GLOBAL_ALL);
 
   curl = curl_easy_init();
   if (!curl) {
@@ -330,9 +329,7 @@ int32_t Utils::requestUrl(const std::string &url, 
std::string &urlByDNS,
   LOG_INFO("request from tdm:" << res);
   if (ret != 0) {
     LOG_ERROR("failed to request data from " << urlByDNS);
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
+    if (curl) curl_easy_cleanup(curl);
 
     return SdkCode::kErrorCURL;
   }
@@ -341,25 +338,17 @@ int32_t Utils::requestUrl(const std::string &url, 
std::string &urlByDNS,
   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
   if (code != 200) {
     LOG_ERROR("tdm responsed with code " << code);
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
-
+    if (curl) curl_easy_cleanup(curl);
     return SdkCode::kErrorCURL;
   }
 
   if (res.empty()) {
     LOG_ERROR("tdm return empty data");
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
-
+    if (curl) curl_easy_cleanup(curl);
     return SdkCode::kErrorCURL;
   }
 
-  if (curl)
-    curl_easy_cleanup(curl);
-  curl_global_cleanup();
+  if (curl) curl_easy_cleanup(curl);
 
   return 0;
 }
@@ -406,8 +395,6 @@ int32_t Utils::requestUrl(std::string &res, const 
HttpRequest *request) {
   CURL *curl = NULL;
   struct curl_slist *list = NULL;
 
-  curl_global_init(CURL_GLOBAL_ALL);
-
   curl = curl_easy_init();
   if (!curl) {
     LOG_ERROR("failed to init curl object");
@@ -415,8 +402,7 @@ int32_t Utils::requestUrl(std::string &res, const 
HttpRequest *request) {
   }
 
   // http header
-  list = curl_slist_append(list,
-                           "Content-Type: application/x-www-form-urlencoded");
+  list = curl_slist_append(list,"Content-Type: 
application/x-www-form-urlencoded");
 
   if (request->need_auth && !request->auth_id.empty() &&
       !request->auth_key.empty()) {
@@ -446,9 +432,7 @@ int32_t Utils::requestUrl(std::string &res, const 
HttpRequest *request) {
   if (ret != 0) {
     LOG_ERROR(curl_easy_strerror(ret));
     LOG_ERROR("failed to request data from " << request->url.c_str());
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
+    if (curl) curl_easy_cleanup(curl);
 
     return SdkCode::kErrorCURL;
   }
@@ -456,26 +440,20 @@ int32_t Utils::requestUrl(std::string &res, const 
HttpRequest *request) {
   int32_t code;
   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
   if (code != 200) {
-    LOG_ERROR("tdm responsed with code " << code);
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
+    if (curl) curl_easy_cleanup(curl);
 
     return SdkCode::kErrorCURL;
   }
 
   if (res.empty()) {
-    LOG_ERROR("tdm return empty data");
-    if (curl)
-      curl_easy_cleanup(curl);
-    curl_global_cleanup();
+    LOG_ERROR("Empty response");
+    if (curl) curl_easy_cleanup(curl);
 
     return SdkCode::kErrorCURL;
   }
 
-  // clean work
+  // Clean work
   curl_easy_cleanup(curl);
-  curl_global_cleanup();
 
   return 0;
 }

Reply via email to