This is an automated email from the ASF dual-hosted git repository. vinner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 73881b4b12 [INLONG-10861][SDK] Optimize the coredump caused by the DataProxy C++ SDK (#10862) 73881b4b12 is described below commit 73881b4b12ffe4939e9b14bd155f576398b77f22 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Fri Aug 23 10:08:18 2024 +0800 [INLONG-10861][SDK] Optimize the coredump caused by the DataProxy C++ SDK (#10862) --- .../dataproxy-sdk-cpp/src/core/api_imp.cc | 23 +++++------- .../dataproxy-sdk-cpp/src/core/api_imp.h | 5 ++- .../dataproxy-sdk-cpp/src/manager/buffer_manager.h | 11 +++++- .../dataproxy-sdk-cpp/src/manager/metric_manager.h | 18 +++++++--- .../dataproxy-sdk-cpp/src/manager/msg_manager.h | 12 +++++-- .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 4 +++ .../dataproxy-sdk-cpp/src/utils/utils.cc | 42 ++++++---------------- 7 files changed, 58 insertions(+), 57 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index 2109af502e..a9522eee69 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -34,8 +34,6 @@ int32_t ApiImp::InitApi(const char *config_file_path) { return SdkCode::kMultiInit; } - user_exit_flag_.getAndSet(0); - if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) { return SdkCode::kErrorInit; } @@ -48,6 +46,9 @@ int32_t ApiImp::InitApi(const char *config_file_path) { int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back) { + if (inited_ == false || exit_flag_) { + return SdkCode::kSendBeforeInit; + } int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); if(code !=SdkCode::kSuccess){ return code; @@ -57,6 +58,9 @@ int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, } int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, int64_t data_time, UserCallBack call_back) { + if (inited_ == false || exit_flag_) { + return SdkCode::kSendBeforeInit; + } int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); if(code !=SdkCode::kSuccess){ return code; @@ -73,10 +77,6 @@ int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char *inlong_s if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == nullptr || msg_len <= 0) { return SdkCode::kInvalidInput; } - - if (inited_ == false) { - return SdkCode::kSendBeforeInit; - } return SdkCode::kSuccess; } @@ -99,10 +99,7 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& } int32_t ApiImp::CloseApi(int32_t max_waitms) { - if (!__sync_bool_compare_and_swap(&init_flag_, false, true)) { - LOG_ERROR("sdk has been closed! ."); - return SdkCode::kMultiExits; - } + exit_flag_ = true; std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms)); return SdkCode::kSuccess; } @@ -126,8 +123,7 @@ int32_t ApiImp::DoInit() { } int32_t ApiImp::CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg) { - if (init_succeed_ == 0 || user_exit_flag_.get() == 1) { - LOG_ERROR("capi has been closed, Init first and then send"); + if (!init_succeed_) { return SdkCode::kSendAfterClose; } @@ -154,14 +150,13 @@ int32_t ApiImp::InitManager() { recv_manager_ = std::make_shared<RecvManager>(send_manager_); if (!recv_manager_) { - LOG_ERROR("fail to Init global packqueue"); return SdkCode::kErrorInit; } init_succeed_ = true; return SdkCode::kSuccess; } int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) { - if (inited_ == false) { + if (!inited_) { return SdkCode::kSendBeforeInit; } for (auto group_id : group_ids) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h index cd6a9757a4..9671d8fc2b 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h @@ -53,11 +53,10 @@ class ApiImp { int32_t ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len); - AtomicInt user_exit_flag_{0}; - volatile bool init_flag_ = false; volatile bool inited_ = false; volatile bool init_succeed_ = false; - AtomicInt buf_full_{0}; + volatile bool exit_flag_ = false; + uint32_t max_msg_length_; std::string local_ip_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h index 428188e38e..ed0c253baa 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h @@ -31,6 +31,7 @@ class BufferManager { std::queue<SendBufferPtrT> buffer_queue_; mutable std::mutex mutex_; uint32_t queue_limit_; + bool exit_= false; BufferManager() { uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); @@ -54,6 +55,11 @@ class BufferManager { AddSendBuffer(send_buffer); } } + ~BufferManager(){ + std::lock_guard<std::mutex> lck(mutex_); + exit_ = true; + LOG_INFO("Buffer manager exited"); + } public: static BufferManager *GetInstance() { @@ -61,6 +67,9 @@ class BufferManager { return &instance; } SendBufferPtrT GetSendBuffer() { + if(exit_){ + return nullptr; + } std::lock_guard<std::mutex> lck(mutex_); if (buffer_queue_.empty()) { return nullptr; @@ -70,7 +79,7 @@ class BufferManager { return buf; } void AddSendBuffer(const SendBufferPtrT &send_buffer) { - if (nullptr == send_buffer) { + if (nullptr == send_buffer || exit_) { return; } send_buffer->releaseBuf(); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h index 5dc013f2d5..58b9481e0f 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h @@ -24,6 +24,7 @@ #include "../config/sdk_conf.h" #include "../metric/environment.h" #include "../metric/metric.h" +#include "../utils/logger.h" #ifndef INLONG_METRIC_MANAGER_H #define INLONG_METRIC_MANAGER_H @@ -40,10 +41,6 @@ class MetricManager { Environment environment_; std::string coreParma_; - MetricManager() { - - } - public: static MetricManager *GetInstance() { static MetricManager instance; @@ -54,23 +51,35 @@ class MetricManager { void PrintMetric(); void Run(); void UpdateMetric(const std::string &stat_key, Metric &stat) { + if(!running_){ + return; + } std::lock_guard<std::mutex> lck(mutex_); stat_map_[stat_key].Update(stat); } void AddReceiveBufferFullCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + if(!running_){ + return; + } std::lock_guard<std::mutex> lck(mutex_); std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); stat_map_[stat_key].AddReceiveBufferFullCount(count); } void AddTooLongMsgCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + if (!running_) { + return; + } std::lock_guard<std::mutex> lck(mutex_); std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); stat_map_[stat_key].AddTooLongMsgCount(count); } void AddMetadataFailCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + if (!running_) { + return; + } std::lock_guard<std::mutex> lck(mutex_); std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); stat_map_[stat_key].AddMetadataFailCount(count); @@ -87,6 +96,7 @@ class MetricManager { if (update_thread_.joinable()) { update_thread_.join(); } + LOG_INFO("Metric manager exited"); } }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h index 26aa387083..c304ead157 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h @@ -32,6 +32,7 @@ class MsgManager { mutable std::mutex mutex_; uint32_t queue_limit_; bool enable_share_msg_; + bool exit_= false; MsgManager() { uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); uint32_t buffer_num = SdkConfig::getInstance()->recv_buf_size_ / data_capacity_; @@ -48,6 +49,11 @@ class MsgManager { AddMsg(msg_ptr); } } + ~MsgManager(){ + std::lock_guard<std::mutex> lck(mutex_); + exit_ = true; + LOG_INFO("Msg manager exited"); + } public: static MsgManager *GetInstance() { @@ -55,7 +61,7 @@ class MsgManager { return &instance; } SdkMsgPtr GetMsg() { - if (!enable_share_msg_) { + if (!enable_share_msg_ || exit_) { return nullptr; } std::lock_guard<std::mutex> lck(mutex_); @@ -67,7 +73,7 @@ class MsgManager { return buf; } void AddMsg(const SdkMsgPtr &msg_ptr) { - if (nullptr == msg_ptr || !enable_share_msg_) { + if (nullptr == msg_ptr || !enable_share_msg_ || exit_) { return; } std::lock_guard<std::mutex> lck(mutex_); @@ -78,7 +84,7 @@ class MsgManager { } void AddMsg(const std::vector<SdkMsgPtr> &user_msg_vector) { - if (!enable_share_msg_) { + if (!enable_share_msg_ || exit_) { return; } std::lock_guard<std::mutex> lck(mutex_); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 3db331aeda..c84435ea89 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -21,6 +21,7 @@ #include "api_code.h" #include <fstream> +#include <curl/curl.h> #include "../config/ini_help.h" #include "../utils/capi_constant.h" @@ -41,11 +42,14 @@ ProxyManager::~ProxyManager() { if (update_conf_thread_.joinable()) { update_conf_thread_.join(); } + + curl_global_cleanup(); } void ProxyManager::Init() { timeout_ = SdkConfig::getInstance()->manager_url_timeout_; last_update_time_ = Utils::getCurrentMsTime(); if (__sync_bool_compare_and_swap(&inited_, false, true)) { + curl_global_init(CURL_GLOBAL_ALL); ReadLocalCache(); update_conf_thread_ = std::thread(&ProxyManager::Update, this); } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc index b3856b440b..7fe5ba4dc5 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc @@ -309,7 +309,6 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS, } CURL *curl = NULL; - curl_global_init(CURL_GLOBAL_ALL); curl = curl_easy_init(); if (!curl) { @@ -330,9 +329,7 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS, LOG_INFO("request from tdm:" << res); if (ret != 0) { LOG_ERROR("failed to request data from " << urlByDNS); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } @@ -341,25 +338,17 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS, curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code); if (code != 200) { LOG_ERROR("tdm responsed with code " << code); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); - + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } if (res.empty()) { LOG_ERROR("tdm return empty data"); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); - + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); + if (curl) curl_easy_cleanup(curl); return 0; } @@ -406,8 +395,6 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) { CURL *curl = NULL; struct curl_slist *list = NULL; - curl_global_init(CURL_GLOBAL_ALL); - curl = curl_easy_init(); if (!curl) { LOG_ERROR("failed to init curl object"); @@ -415,8 +402,7 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) { } // http header - list = curl_slist_append(list, - "Content-Type: application/x-www-form-urlencoded"); + list = curl_slist_append(list,"Content-Type: application/x-www-form-urlencoded"); if (request->need_auth && !request->auth_id.empty() && !request->auth_key.empty()) { @@ -446,9 +432,7 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) { if (ret != 0) { LOG_ERROR(curl_easy_strerror(ret)); LOG_ERROR("failed to request data from " << request->url.c_str()); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } @@ -456,26 +440,20 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) { int32_t code; curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code); if (code != 200) { - LOG_ERROR("tdm responsed with code " << code); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } if (res.empty()) { - LOG_ERROR("tdm return empty data"); - if (curl) - curl_easy_cleanup(curl); - curl_global_cleanup(); + LOG_ERROR("Empty response"); + if (curl) curl_easy_cleanup(curl); return SdkCode::kErrorCURL; } - // clean work + // Clean work curl_easy_cleanup(curl); - curl_global_cleanup(); return 0; }