This is an automated email from the ASF dual-hosted git repository. doleyzi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2bb4c0eaa4 [INLONG-10851][SDK] Support multiple protocols for DataProxy C++ SDK (#10855) 2bb4c0eaa4 is described below commit 2bb4c0eaa42ebf57c4fbd13d2214864d2e134408 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Aug 22 17:17:10 2024 +0800 [INLONG-10851][SDK] Support multiple protocols for DataProxy C++ SDK (#10855) --- .../dataproxy-sdk-cpp/src/client/tcp_client.cc | 2 +- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 31 ++-- .../dataproxy-sdk-cpp/src/config/sdk_conf.h | 7 +- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 10 +- .../dataproxy-sdk-cpp/src/group/recv_group.h | 3 + .../src/manager/metric_manager.cc | 1 + .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 133 +++-------------- .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 3 +- .../dataproxy-sdk-cpp/src/manager/send_manager.cc | 2 +- .../dataproxy-sdk-cpp/src/metric/environment.h | 7 +- .../dataproxy-sdk-cpp/src/metric/metric.h | 4 +- .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 1 + .../dataproxy-sdk-cpp/src/utils/parse_json.cc | 160 +++++++++++++++++++++ .../dataproxy-sdk-cpp/src/utils/parse_json.h | 35 +++++ 14 files changed, 248 insertions(+), 151 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc index c3b7692809..1e4d8bcedf 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc @@ -307,7 +307,7 @@ void TcpClient::UpdateMetric() { stat.Update(it.second); it.second.ResetStat(); } - LOG_INFO(stat.ToString() << CLIENT_INFO); + LOG_INFO(stat.GetSendMetricInfo() << CLIENT_INFO); } void TcpClient::HeartBeat(bool only_heart_heat) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 5f067528af..6ff642472f 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -121,8 +121,6 @@ void SdkConfig::defaultInit() { // manager parameters manager_url_ = constants::kManagerURL; - enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster; - manager_cluster_url_ = constants::kManagerClusterURL; manager_update_interval_ = constants::kManagerUpdateInterval; manager_url_timeout_ = constants::kManagerTimeout; max_proxy_num_ = constants::kMaxProxyNum; @@ -329,22 +327,7 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { } else { manager_url_ = constants::kManagerURL; } - // manager cluster url - if (doc.HasMember("manager_cluster_url") && - doc["manager_cluster_url"].IsString()) { - const rapidjson::Value &obj = doc["manager_cluster_url"]; - manager_cluster_url_ = obj.GetString(); - } else { - manager_cluster_url_ = constants::kManagerClusterURL; - } - // enable manager from cluster - if (doc.HasMember("enable_manager_url_from_cluster") && - doc["enable_manager_url_from_cluster"].IsBool()) { - const rapidjson::Value &obj = doc["enable_manager_url_from_cluster"]; - enable_manager_url_from_cluster_ = obj.GetBool(); - } else { - enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster; - } + // manager update interval if (doc.HasMember("manager_update_interval") && doc["manager_update_interval"].IsInt() && @@ -522,6 +505,13 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) { } else { max_instance_ = constants::kMaxInstance; } + + if (doc.HasMember("extend_report") && doc["extend_report"].IsBool()) { + const rapidjson::Value &obj = doc["extend_report"]; + extend_report_ = obj.GetBool(); + } else { + extend_report_ = constants::kExtendReport; + } } bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string &localhost) { @@ -589,11 +579,6 @@ void SdkConfig::ShowClientConfig() { LOG_INFO("log_level: " << log_level_); LOG_INFO("log_path: " << log_path_.c_str()); LOG_INFO("manager_url: " << manager_url_.c_str()); - LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str()); - LOG_INFO( - "enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_ - ? "true" - : "false"); LOG_INFO("manager_update_interval: minutes" << manager_update_interval_); LOG_INFO("manager_url_timeout: " << manager_url_timeout_); LOG_INFO("max_tcp_num: " << max_proxy_num_); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h index f120343f07..608e076b3b 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h @@ -25,6 +25,8 @@ #include <stdint.h> #include <string> #include <vector> +#include <atomic> +#include "../utils/capi_constant.h" namespace inlong { class SdkConfig { @@ -40,7 +42,7 @@ private: void InitAuthParm(const rapidjson::Value &doc); void OthersParam(const rapidjson::Value &doc); bool GetLocalIPV4Address(std::string& err_info, std::string& localhost); - SdkConfig() { defaultInit(); }; + SdkConfig():extend_report_(false) { defaultInit(); }; public: // cache parameter @@ -80,8 +82,6 @@ private: // manager parameters std::string manager_url_; - bool enable_manager_url_from_cluster_; - std::string manager_cluster_url_; uint32_t manager_update_interval_; // Automatic update interval, minutes uint32_t manager_url_timeout_; // URL parsing timeout, seconds uint64_t max_proxy_num_; @@ -114,6 +114,7 @@ private: uint32_t buf_size_; volatile bool parsed_ = false; + bool extend_report_; void defaultInit(); static SdkConfig *getInstance(); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index 1ffeeef8dc..ec64292a1e 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -50,6 +50,8 @@ RecvGroup::RecvGroup(const std::string &group_key, std::shared_ptr<SendManager> last_pack_time_ = Utils::getCurrentMsTime(); max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_; local_ip_ = SdkConfig::getInstance()->local_ip_; + group_id_key_ = SdkConfig::getInstance()->extend_report_ ? "bid=" : "groupId="; + stream_id_key_ = SdkConfig::getInstance()->extend_report_ ? "&tid=" : "&streamId="; LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ << ",max_recv_size:" << max_recv_size_); } @@ -224,8 +226,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t & streamId_num_ == 0) { groupId_num = 0; streamId_num = 0; - groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_; char_groupId_flag = 0x4; } else { groupId_num = groupId_num_; @@ -245,7 +246,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t & "&node1ip=" + SdkConfig::getInstance()->local_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); } else { - attr = "groupId=" + msgs[0]->inlong_group_id_ + + attr = group_id_key_ + msgs[0]->inlong_group_id_ + "&streamId=" + msgs[0]->inlong_stream_id_; } *(uint16_t *)bodyBegin = htons(attr.size()); @@ -296,8 +297,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t & // attr std::string attr; - attr = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_; attr += "&dt=" + std::to_string(data_time_); attr += "&mid=" + std::to_string(uniq_id_); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h index 839c49e14d..64681de0bb 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h @@ -63,6 +63,9 @@ class RecvGroup { std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_; std::queue<SendBufferPtrT> fail_queue_; + std::string group_id_key_; + std::string stream_id_key_; + void DoDispatchMsg(); bool IsZipAndOperate(std::string& res, uint32_t real_cur_len); inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc index 061abc0678..c4d31ac941 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc @@ -39,6 +39,7 @@ void MetricManager::InitEnvironment() { environment_.setVersion(constants::kVersion); environment_.setPid(getpid()); environment_.setIp(SdkConfig::getInstance()->local_ip_); + environment_.SetExtendReport(SdkConfig::getInstance()->extend_report_); } void MetricManager::Run() { prctl(PR_SET_NAME, "metric-manager"); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 1653d68de0..3db331aeda 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -18,13 +18,15 @@ */ #include "proxy_manager.h" + +#include "api_code.h" +#include <fstream> + #include "../config/ini_help.h" #include "../utils/capi_constant.h" #include "../utils/logger.h" #include "../utils/utils.h" -#include "api_code.h" -#include <fstream> -#include <rapidjson/document.h> +#include "../utils/parse_json.h" namespace inlong { const uint64_t MINUTE = 60000; @@ -110,104 +112,6 @@ void ProxyManager::DoUpdate() { LOG_INFO("finish ProxyManager DoUpdate."); } -int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, - const std::string &meta_data, - ProxyInfoVec &proxy_info_vec) { - rapidjson::Document doc; - if (doc.Parse(meta_data.c_str()).HasParseError()) { - LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":" - << doc.GetErrorOffset()); - return SdkCode::kErrorParseJson; - } - - if (!(doc.HasMember("success") && doc["success"].IsBool() && - doc["success"].GetBool())) { - LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success: not " - "exist or false" - << inlong_group_id.c_str()); - return SdkCode::kErrorParseJson; - } - // check data valid - if (!doc.HasMember("data") || doc["data"].IsNull()) { - LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data: not exist " - "or null" - << inlong_group_id.c_str()); - return SdkCode::kErrorParseJson; - } - - // check nodelist valid - const rapidjson::Value &clusterInfo = doc["data"]; - if (!clusterInfo.HasMember("nodeList") || clusterInfo["nodeList"].IsNull()) { - LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or null" - << inlong_group_id.c_str()); - return SdkCode::kErrorParseJson; - } - - // check nodeList isn't empty - const rapidjson::Value &nodeList = clusterInfo["nodeList"]; - if (nodeList.GetArray().Size() == 0) { - LOG_ERROR("empty nodeList of inlong_group_id:%s" - << inlong_group_id.c_str()); - return SdkCode::kErrorParseJson; - } - // check clusterId - if (!clusterInfo.HasMember("clusterId") || - !clusterInfo["clusterId"].IsInt() || - clusterInfo["clusterId"].GetInt() < 0) { - LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a integer" - << inlong_group_id.c_str()); - return SdkCode::kErrorParseJson; - } - groupid_2_cluster_id_update_map_[inlong_group_id] = - clusterInfo["clusterId"].GetInt(); - - // check load - int32_t load = 0; - if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() && - !clusterInfo["load"].IsNull()) { - const rapidjson::Value &obj = clusterInfo["load"]; - load = obj.GetInt(); - } else { - load = 0; - } - - // proxy list - for (auto &proxy : nodeList.GetArray()) { - std::string ip; - std::string id; - int32_t port; - if (proxy.HasMember("ip") && !proxy["ip"].IsNull()) - ip = proxy["ip"].GetString(); - else { - LOG_ERROR("this ip info is null"); - continue; - } - if (proxy.HasMember("port") && !proxy["port"].IsNull()) { - if (proxy["port"].IsString()) - port = std::stoi(proxy["port"].GetString()); - else if (proxy["port"].IsInt()) - port = proxy["port"].GetInt(); - } - - else { - LOG_ERROR("this ip info is null or negative"); - continue; - } - if (proxy.HasMember("id") && !proxy["id"].IsNull()) { - if (proxy["id"].IsString()) - id = proxy["id"].GetString(); - else if (proxy["id"].IsInt()) - id = proxy["id"].GetInt(); - } else { - LOG_WARN("there is no id info of inlong_group_id"); - continue; - } - proxy_info_vec.emplace_back(id, ip, port, load); - } - - return SdkCode::kSuccess; -} - int32_t ProxyManager::GetProxy(const std::string &key, ProxyInfoVec &proxy_info_vec) { if (constants::IsolationLevel::kLevelOne == @@ -407,16 +311,15 @@ void ProxyManager::UpdateProxy( LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first); continue; } - std::string url; - if (SdkConfig::getInstance()->enable_manager_url_from_cluster_) - url = SdkConfig::getInstance()->manager_cluster_url_; - else { - url = - SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first; + std::string url = SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first; + if (SdkConfig::getInstance()->extend_report_) { + url = SdkConfig::getInstance()->manager_url_ + "?bid=" + groupid2cluster.first + "&net_tag=all&ip=" + + SdkConfig::getInstance()->local_ip_; } + std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ + - "&version=" + constants::kVersion + - "&protocolType=" + constants::kProtocolType; + "&version=" + constants::kVersion + + "&protocolType=" + constants::kProtocolType; LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str() << "proxy cfg url " << url.c_str() << "post_data:" << post_data.c_str()); @@ -441,7 +344,7 @@ void ProxyManager::UpdateProxy( if (groupid_2_proxy_map_.find(groupid2cluster.first) != groupid_2_proxy_map_.end()) { LOG_WARN("failed to request from manager, use previous " - << groupid2cluster.first); + << groupid2cluster.first); continue; } if (!SdkConfig::getInstance()->enable_local_cache_) { @@ -456,10 +359,14 @@ void ProxyManager::UpdateProxy( } ProxyInfoVec proxyInfoVec; - ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec); + if (SdkConfig::getInstance()->extend_report_) { + ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, groupid_2_cluster_id_update_map_, proxyInfoVec); + } else { + ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, proxyInfoVec, groupid_2_cluster_id_update_map_); + } + if (ret != SdkCode::kSuccess) { - LOG_ERROR("failed to parse groupid:%s json proxy list " - << groupid2cluster.first.c_str()); + LOG_ERROR("Failed to parse json: " << meta_data); continue; } if (!proxyInfoVec.empty()) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 419bc60798..2dec2f572e 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -27,6 +27,7 @@ #include <vector> namespace inlong { +using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>; class ProxyManager { private: uint32_t timeout_; @@ -53,7 +54,7 @@ private: uint64_t last_update_time_; int32_t ParseAndGet(const std::string &key, const std::string &meta_data, - ProxyInfoVec &proxy_info_vec); + ProxyInfoVec &proxy_info_vec,GroupId2ClusterIdMap &group_id_2_cluster_id); public: ProxyManager(){}; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc index 0af18f7ccb..fe830c4cb0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc @@ -64,7 +64,7 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) { unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_); auto send_group_map = send_group_map_.find(send_group_key); if (send_group_map == send_group_map_.end()) { - LOG_ERROR("fail to get send group, group_id:%s" << send_group_key); + LOG_ERROR("Fail to get send group, group key:" << send_group_key); return nullptr; } if (send_group_map->second.empty()) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h index e3d4a9bcc8..8d64faadc7 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h @@ -27,6 +27,7 @@ class Environment { std::string version_; std::string ip_; uint64_t pid_; + bool extend_report_; const std::string &getType() const { return type_; } void setType(const std::string &type) { type_ = type; } std::string getVersion() { return version_; } @@ -35,12 +36,14 @@ class Environment { void setIp(const std::string &ip) { ip_ = ip; } uint64_t getPid() const { return pid_; } void setPid(uint64_t pid) { pid_ = pid; } + void SetExtendReport(bool extend_report) {extend_report_ = extend_report;} std::string ToString() const { std::stringstream metric; metric << "local ip[" << ip_ << "] "; - metric << "version [" << version_ << "] "; - metric << "pid [" << pid_ << "] "; + metric << "version[" << version_ << "] "; + metric << "pid[" << pid_ << "] "; + metric << "extend report[" << extend_report_ << "]"; return metric.str(); } }; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h index 87bd991b28..1dcbc33afa 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h @@ -130,7 +130,7 @@ class Metric { metric << "msg[" << send_success_msg_num_ << "] "; metric << "failed-pack[" << send_failed_pack_num_ << "] "; metric << "msg[" << send_failed_msg_num_ << "] "; - metric << "trans[" << getTransTime() << "] "; + metric << "trans[" << getTransTime() << "]"; return metric.str(); } std::string ToString() const { @@ -142,7 +142,7 @@ class Metric { metric << "trans[" << getTransTime() << "] "; metric << "buffer full[" << receive_buffer_full_count_ << "] "; metric << "too long msg[" << too_long_msg_count_ << "] "; - metric << "metadata fail[" << metadata_fail_count_ << "] "; + metric << "metadata fail[" << metadata_fail_count_ << "]"; return metric.str(); } }; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index a1753b8255..f97cacf484 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -93,6 +93,7 @@ static const bool kEnableSetAffinity = false; static const uint32_t kMaskCPUAffinity = 0xff; static const uint16_t kExtendField = 0; static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max(); +static const bool kExtendReport = false; // http basic auth static const char kBasicAuthHeader[] = "Authorization:"; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc new file mode 100644 index 0000000000..b1e4ebff7e --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parse_json.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/document.h" +#include "../core/api_code.h" +#include "../utils/logger.h" + +namespace inlong { +static const char kJsonKeySuccess[] = "success"; +static const char kJsonKeyData[] = "data"; +static const char kJsonKeyNodeList[] = "nodeList"; +static const char kJsonKeyClusterId[] = "clusterId"; +static const char kJsonKeyLoad[] = "load"; +static const char kJsonKeyIp[] = "ip"; +static const char kJsonKeyPort[] = "port"; +static const char kJsonKeyId[] = "id"; +static const char kJsonKeySize[] = "size"; +static const char kJsonKeyClusterIdV2[] = "cluster_id"; +static const char kJsonKeyAddress[] = "address"; +static const char kJsonKeyHost[] = "host"; + +int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id, + const std::string &meta_data, + ProxyInfoVec &proxy_info_vec, + GroupId2ClusterIdMap &group_id_2_cluster_id) { + rapidjson::Document doc; + if (doc.Parse(meta_data.c_str()).HasParseError()) { + return SdkCode::kErrorParseJson; + } + + if (!(doc.HasMember(kJsonKeySuccess) && doc[kJsonKeySuccess].IsBool() && + doc[kJsonKeySuccess].GetBool())) { + return SdkCode::kErrorParseJson; + } + + if (!doc.HasMember(kJsonKeyData) || doc[kJsonKeyData].IsNull()) { + return SdkCode::kErrorParseJson; + } + + const rapidjson::Value &clusterInfo = doc[kJsonKeyData]; + if (!clusterInfo.HasMember(kJsonKeyNodeList) || clusterInfo[kJsonKeyNodeList].IsNull()) { + return SdkCode::kErrorParseJson; + } + + const rapidjson::Value &nodeList = clusterInfo[kJsonKeyNodeList]; + if (nodeList.GetArray().Size() <= 0) { + return SdkCode::kErrorParseJson; + } + + if (!clusterInfo.HasMember(kJsonKeyClusterId) || + !clusterInfo[kJsonKeyClusterId].IsInt() || + clusterInfo[kJsonKeyClusterId].GetInt() <= 0) { + return SdkCode::kErrorParseJson; + } + + group_id_2_cluster_id[inlong_group_id] = clusterInfo[kJsonKeyClusterId].GetInt(); + + int32_t load = 0; + if (clusterInfo.HasMember(kJsonKeyLoad) && clusterInfo[kJsonKeyLoad].IsInt() && + !clusterInfo[kJsonKeyLoad].IsNull()) { + load = clusterInfo[kJsonKeyLoad].GetInt(); + } + + for (auto &proxy : nodeList.GetArray()) { + std::string ip; + std::string id; + int32_t port; + if (!proxy.HasMember(kJsonKeyIp) || proxy[kJsonKeyIp].IsNull()) { + continue; + } + ip = proxy[kJsonKeyIp].GetString(); + + if (!proxy.HasMember(kJsonKeyPort) || proxy[kJsonKeyPort].IsNull()) { + continue; + } + if (proxy[kJsonKeyPort].IsString()) port = std::stoi(proxy[kJsonKeyPort].GetString()); + if (proxy[kJsonKeyPort].IsInt()) port = proxy[kJsonKeyPort].GetInt(); + + if (!proxy.HasMember(kJsonKeyId) || proxy[kJsonKeyId].IsNull()) { + continue; + } + if (proxy[kJsonKeyId].IsString()) id = proxy[kJsonKeyId].GetString(); + if (proxy[kJsonKeyId].IsInt()) id = proxy[kJsonKeyId].GetInt(); + + proxy_info_vec.emplace_back(id, ip, port, load); + } + return SdkCode::kSuccess; +} +int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id, const std::string &meta_data, + GroupId2ClusterIdMap &group_id_2_cluster_id, ProxyInfoVec &proxy_info_vec) { + rapidjson::Document doc; + if (doc.Parse(meta_data.c_str()).HasParseError()) { + return SdkCode::kErrorParseJson; + } + + if (!doc.HasMember(kJsonKeySize) || !doc[kJsonKeySize].IsInt() || doc[kJsonKeySize].IsNull() + || doc[kJsonKeySize].GetInt() <= 0) { + return SdkCode::kErrorParseJson; + } + + if (doc.HasMember(kJsonKeyClusterIdV2) && doc[kJsonKeyClusterIdV2].IsInt() && !doc[kJsonKeyClusterIdV2].IsNull()) { + const rapidjson::Value &obj = doc[kJsonKeyClusterIdV2]; + group_id_2_cluster_id[inlong_group_id] = obj.GetInt(); + } else { + return SdkCode::kErrorParseJson; + } + + int32_t load = 0; + if (doc.HasMember(kJsonKeyLoad) && doc[kJsonKeyLoad].IsInt() && !doc[kJsonKeyLoad].IsNull()) { + load = doc[kJsonKeyLoad].GetInt(); + } + + if (!doc.HasMember(kJsonKeyAddress) || doc[kJsonKeyAddress].IsNull()) { + return SdkCode::kErrorParseJson; + } + + const rapidjson::Value &host_list = doc[kJsonKeyAddress]; + for (auto &info : host_list.GetArray()) { + std::string id, ip; + if (!info.HasMember(kJsonKeyHost) || info[kJsonKeyHost].IsNull()) { + continue; + } + ip = info[kJsonKeyHost].GetString(); + + if (!info.HasMember(kJsonKeyPort) || info[kJsonKeyPort].IsNull()) { + continue; + } + + int32_t port; + if (info[kJsonKeyPort].IsString()) port = std::stoi(info[kJsonKeyPort].GetString()); + if (info[kJsonKeyPort].IsInt()) port = info[kJsonKeyPort].GetInt(); + + if (!info.HasMember(kJsonKeyId) || info[kJsonKeyId].IsNull()) { + continue; + } + if (info[kJsonKeyId].IsString()) id = info[kJsonKeyId].GetString(); + if (info[kJsonKeyId].IsInt()) id = std::to_string(info[kJsonKeyId].GetInt()); + + proxy_info_vec.emplace_back(id, ip, port, load); + } + return SdkCode::kSuccess; +} +} + diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h new file mode 100644 index 0000000000..01abdbdc6e --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H +#define DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H + +#include <string> +#include <unordered_map> +#include "../config/proxy_info.h" + +namespace inlong { +using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>; +class ParseJson { + public: + static int32_t ParseProxyInfo(const std::string &inlong_group_id, const std::string &meta_data, + ProxyInfoVec &proxy_info_vec, GroupId2ClusterIdMap &group_id_2_cluster_id); + static int32_t ParseProxyInfo(const std::string &inlong_group_id, const std::string &meta_data, + GroupId2ClusterIdMap &group_id_2_cluster_id, ProxyInfoVec &proxy_info_vec); +}; +} +#endif //DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H