This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 7178b39a3f [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357) 7178b39a3f is described below commit 7178b39a3fd30d1af9a26d5259f72b727167c519 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Nov 29 18:21:10 2023 +0800 [INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357) --- .../dataproxy-sdk-cpp/release/demo/send_demo.cc | 2 +- .../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 3 +- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 19 +++++----- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 29 +++++++++++++--- .../dataproxy-sdk-cpp/src/group/recv_group.h | 1 + .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 40 +++++++++++----------- .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 3 +- .../dataproxy-sdk-cpp/src/manager/send_manager.cc | 24 ++++++------- .../dataproxy-sdk-cpp/src/manager/send_manager.h | 1 + .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 3 ++ 10 files changed, 76 insertions(+), 49 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc index a4a783e0dc..b09f8463f1 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc @@ -53,7 +53,7 @@ int main(int argc, char const *argv[]) { cout << "---->start sdk successfully" << endl; int count = 1000; - string inlong_group_id = "test_cpp_sdk_20230404"; + string inlong_group_id = "test_cpp_sdk"; string inlong_stream_id = "stream1"; if (4 == argc) { inlong_group_id = argv[2]; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h index 578278c5fd..6d7b23dc21 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h @@ -83,7 +83,7 @@ private: uint64_t max_proxy_num_; uint64_t reserve_proxy_num_; uint32_t msg_type_; - bool enable_isolation_; + uint32_t isolation_level_; // Network parameters bool enable_tcp_nagle_; @@ -92,6 +92,7 @@ private: bool enable_balance_; bool enable_local_cache_; + // auth settings bool need_auth_; std::string auth_id_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index e9d679cd8c..319262adc2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -92,6 +92,7 @@ void SdkConfig::defaultInit() { load_balance_interval_ = constants::kLoadBalanceInterval; heart_beat_interval_ = constants::kHeartBeatInterval; enable_balance_ = constants::kEnableBalance; + isolation_level_=constants::IsolationLevel::kLevelSecond; // cache parameter send_buf_size_ = constants::kSendBufSize; @@ -120,7 +121,6 @@ void SdkConfig::defaultInit() { manager_update_interval_ = constants::kManagerUpdateInterval; manager_url_timeout_ = constants::kManagerTimeout; max_proxy_num_ = constants::kMaxProxyNum; - enable_isolation_ = constants::kEnableIsolation; reserve_proxy_num_ = constants::kReserveProxyNum; enable_local_cache_ = constants::kEnableLocalCache; @@ -360,13 +360,6 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { std::string inlong_group_ids_str = obj.GetString(); Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ","); } - // enable isolation - if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) { - const rapidjson::Value &obj = doc["enable_isolation"]; - enable_isolation_ = obj.GetBool(); - } else { - enable_isolation_ = constants::kEnableIsolation; - } // enable local cache if (doc.HasMember("enable_local_cache") && doc["enable_local_cache"].IsBool()) { @@ -375,6 +368,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { } else { enable_local_cache_ = constants::kEnableLocalCache; } + + // isolation level + if (doc.HasMember("isolation_level") && doc["isolation_level"].IsInt() && doc["isolation_level"].GetInt() > 0) { + const rapidjson::Value &obj = doc["isolation_level"]; + isolation_level_ = obj.GetInt(); + } else { + isolation_level_ = constants::IsolationLevel::kLevelSecond; + } } void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { @@ -564,7 +565,7 @@ void SdkConfig::ShowClientConfig() { LOG_INFO("auth_key: " << auth_key_.c_str()); LOG_INFO("max_group_id_num: " << max_group_id_num_); LOG_INFO("max_stream_id_num: " << max_stream_id_num_); - LOG_INFO("enable_isolation: " << enable_isolation_); + LOG_INFO("isolation_level: " << isolation_level_); } } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index 79931bd434..b852095f68 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -26,11 +26,13 @@ namespace inlong { const uint32_t DEFAULT_PACK_ATTR = 400; +const uint64_t LOG_SAMPLE=100; RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager) : cur_len_(0), groupId_num_(0), streamId_num_(0), msg_type_(SdkConfig::getInstance()->msg_type_), data_capacity_(SdkConfig::getInstance()->buf_size_), - send_manager_(send_manager),group_key_(group_key) { + send_manager_(send_manager),group_key_(group_key), + log_stat_(0){ data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR; @@ -70,22 +72,41 @@ int32_t RecvGroup::DoDispatchMsg() { last_pack_time_ = Utils::getCurrentMsTime(); std::lock_guard<std::mutex> lck(mutex_); if (group_key_.empty()) { - LOG_ERROR("groupId is empty, check!!"); + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("groupId is empty, check!!"); + log_stat_ = 0; + } return SdkCode::kInvalidInput; } if (msgs_.empty()) { - LOG_ERROR("no msg in msg_set, check!"); + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("no msg in msg_set, check!"); + log_stat_ = 0; + } return SdkCode::kMsgEmpty; } auto send_group = send_manager_->GetSendGroup(group_key_); if (send_group == nullptr) { - LOG_ERROR("failed to get send_buf, something gets wrong, checkout!"); + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send_buf, something gets wrong, checkout!"); + log_stat_ = 0; + } return SdkCode::kFailGetSendBuf; } if (!send_group->IsAvailable()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" + << group_key_ << " send group is not available!"); + log_stat_ = 0; + } return SdkCode::kFailGetConn; } if (send_group->IsFull()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" + << group_key_ << " send group is full!"); + log_stat_ = 0; + } return SdkCode::kSendBufferFull; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h index 2d1a9e6315..58d3dc70a3 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h @@ -48,6 +48,7 @@ private: uint64_t max_recv_size_; std::string group_key_; + uint64_t log_stat_; int32_t DoDispatchMsg(); void AddMsg(const std::string &msg, std::string client_ip, diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 1c5f440119..8c2dfc4c1e 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -254,11 +254,10 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, int32_t ProxyManager::GetProxy(const std::string &key, ProxyInfoVec &proxy_info_vec) { - if (SdkConfig::getInstance()->enable_isolation_) { + if (constants::IsolationLevel::kLevelOne == SdkConfig::getInstance()->isolation_level_) { return GetProxyByGroupid(key, proxy_info_vec); - } else { - return GetProxyByClusterId(key, proxy_info_vec); } + return GetProxyByClusterId(key, proxy_info_vec); } int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, @@ -289,13 +288,14 @@ int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, return SdkCode::kSuccess; } -bool ProxyManager::HasProxy(const std::string &inlong_group_id) { - if (SdkConfig::getInstance()->enable_isolation_) { - return CheckGroupid(inlong_group_id); - } else { - return CheckClusterId(inlong_group_id); +bool ProxyManager::HasProxy(const std::string &group_key) { + if (constants::IsolationLevel::kLevelOne == + SdkConfig::getInstance()->isolation_level_) { + return CheckGroupid(group_key); } + return CheckClusterId(group_key); } + int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec) { unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_); @@ -319,15 +319,11 @@ int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id, return SdkCode::kSuccess; } std::string ProxyManager::GetGroupKey(const std::string &groupid) { - if (SdkConfig::getInstance()->enable_isolation_) { - return groupid; - } - unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_); - auto it = groupid_2_cluster_id_map_.find(groupid); - if (it == groupid_2_cluster_id_map_.end()) { - return ""; + if (constants::IsolationLevel::kLevelThird == + SdkConfig::getInstance()->isolation_level_) { + return GetClusterID(groupid); } - return it->second; + return groupid; } bool ProxyManager::CheckGroupid(const std::string &groupid) { unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_); @@ -346,9 +342,6 @@ bool ProxyManager::CheckClusterId(const std::string &cluster_id) { return true; } void ProxyManager::UpdateClusterId2ProxyMap() { - if (SdkConfig::getInstance()->enable_isolation_) { - return; - } unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_); for (const auto &it : groupid_2_cluster_id_update_map_) { ProxyInfoVec proxy_info_vec; @@ -440,5 +433,12 @@ std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) { LOG_INFO("RecoverFromLocalCache:" << groupid << ",local cache:" << meta_data); return meta_data; } - +std::string ProxyManager::GetClusterID(const std::string &groupid) { + unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_); + auto it = groupid_2_cluster_id_map_.find(groupid); + if (it == groupid_2_cluster_id_map_.end()) { + return ""; + } + return it->second; +} } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 6e1c58ef39..8ae859869f 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -68,7 +68,7 @@ public: int32_t GetProxyByClusterId(const std::string &cluster_id, ProxyInfoVec &proxy_info_vec); std::string GetGroupKey(const std::string &groupid); - bool HasProxy(const std::string &inlong_group_id); + bool HasProxy(const std::string &group_key); bool CheckGroupid(const std::string &groupid); bool CheckClusterId(const std::string &cluster_id); void UpdateClusterId2ProxyMap(); @@ -77,6 +77,7 @@ public: void ReadLocalCache(); void WriteLocalCache(); std::string RecoverFromLocalCache(const std::string&groupid); + std::string GetClusterID(const std::string &groupid); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc index 6104411b9a..0af18f7ccb 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc @@ -22,22 +22,15 @@ #include "proxy_manager.h" namespace inlong { SendManager::SendManager() : send_group_idx_(0) { - for (int32_t i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); - i++) { - LOG_INFO("SendManager, group_id:" - << SdkConfig::getInstance()->inlong_group_ids_[i] - << " send group num:" - << SdkConfig::getInstance()->per_groupid_thread_nums_); - std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey( - SdkConfig::getInstance()->inlong_group_ids_[i]); - AddSendGroup(send_group_key); - } + LOG_INFO("SendManager,send group num:" + << SdkConfig::getInstance()->per_groupid_thread_nums_); } SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) { - SendGroupPtr send_group_ptr = DoGetSendGroup(group_key); + std::string send_key= GetSendKey(group_key); + SendGroupPtr send_group_ptr = DoGetSendGroup(send_key); if (send_group_ptr == nullptr) { - AddSendGroup(group_key); + AddSendGroup(send_key); } return send_group_ptr; } @@ -84,5 +77,10 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) { } return send_group_vec[send_group_idx_]; } - +std::string SendManager::GetSendKey(const std::string &send_group_key) { + if (constants::IsolationLevel::kLevelSecond == SdkConfig::getInstance()->isolation_level_) { + return ProxyManager::GetInstance()->GetClusterID(send_group_key); + } + return send_group_key; +} } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h index fa627647f2..d6ab24bcbc 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h @@ -33,6 +33,7 @@ private: std::unordered_map<std::string, std::vector<SendGroupPtr>> send_group_map_; SendGroupPtr DoGetSendGroup(const std::string &send_group_key); void DoAddSendGroup(const std::string &send_group_key); + std::string GetSendKey(const std::string &send_group_key); volatile uint32_t send_group_idx_; public: diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index fe25ee7861..eaa2821263 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -25,6 +25,9 @@ namespace inlong { namespace constants { +enum IsolationLevel{ + kLevelOne =1, kLevelSecond =2, kLevelThird =3 +}; static const int32_t kMaxRequestTDMTimes = 4; static const char kAttrFormat[] = "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";