This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bb4c0eaa4 [INLONG-10851][SDK] Support multiple protocols for 
DataProxy C++ SDK (#10855)
2bb4c0eaa4 is described below

commit 2bb4c0eaa42ebf57c4fbd13d2214864d2e134408
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Aug 22 17:17:10 2024 +0800

    [INLONG-10851][SDK] Support multiple protocols for DataProxy C++ SDK 
(#10855)
---
 .../dataproxy-sdk-cpp/src/client/tcp_client.cc     |   2 +-
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc       |  31 ++--
 .../dataproxy-sdk-cpp/src/config/sdk_conf.h        |   7 +-
 .../dataproxy-sdk-cpp/src/group/recv_group.cc      |  10 +-
 .../dataproxy-sdk-cpp/src/group/recv_group.h       |   3 +
 .../src/manager/metric_manager.cc                  |   1 +
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 133 +++--------------
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.h  |   3 +-
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  |   2 +-
 .../dataproxy-sdk-cpp/src/metric/environment.h     |   7 +-
 .../dataproxy-sdk-cpp/src/metric/metric.h          |   4 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   1 +
 .../dataproxy-sdk-cpp/src/utils/parse_json.cc      | 160 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/utils/parse_json.h       |  35 +++++
 14 files changed, 248 insertions(+), 151 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index c3b7692809..1e4d8bcedf 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -307,7 +307,7 @@ void TcpClient::UpdateMetric() {
     stat.Update(it.second);
     it.second.ResetStat();
   }
-  LOG_INFO(stat.ToString() << CLIENT_INFO);
+  LOG_INFO(stat.GetSendMetricInfo() << CLIENT_INFO);
 }
 
 void TcpClient::HeartBeat(bool only_heart_heat) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 5f067528af..6ff642472f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -121,8 +121,6 @@ void SdkConfig::defaultInit() {
 
   // manager parameters
   manager_url_ = constants::kManagerURL;
-  enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
-  manager_cluster_url_ = constants::kManagerClusterURL;
   manager_update_interval_ = constants::kManagerUpdateInterval;
   manager_url_timeout_ = constants::kManagerTimeout;
   max_proxy_num_ = constants::kMaxProxyNum;
@@ -329,22 +327,7 @@ void SdkConfig::InitManagerParam(const rapidjson::Value 
&doc) {
   } else {
     manager_url_ = constants::kManagerURL;
   }
-  // manager cluster url
-  if (doc.HasMember("manager_cluster_url") &&
-      doc["manager_cluster_url"].IsString()) {
-    const rapidjson::Value &obj = doc["manager_cluster_url"];
-    manager_cluster_url_ = obj.GetString();
-  } else {
-    manager_cluster_url_ = constants::kManagerClusterURL;
-  }
-  // enable manager from cluster
-  if (doc.HasMember("enable_manager_url_from_cluster") &&
-      doc["enable_manager_url_from_cluster"].IsBool()) {
-    const rapidjson::Value &obj = doc["enable_manager_url_from_cluster"];
-    enable_manager_url_from_cluster_ = obj.GetBool();
-  } else {
-    enable_manager_url_from_cluster_ = constants::kEnableManagerFromCluster;
-  }
+
   // manager update interval
   if (doc.HasMember("manager_update_interval") &&
       doc["manager_update_interval"].IsInt() &&
@@ -522,6 +505,13 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
   } else {
     max_instance_ = constants::kMaxInstance;
   }
+
+  if (doc.HasMember("extend_report") && doc["extend_report"].IsBool()) {
+    const rapidjson::Value &obj = doc["extend_report"];
+    extend_report_ = obj.GetBool();
+  } else {
+    extend_report_ = constants::kExtendReport;
+  }
 }
 
 bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string 
&localhost) {
@@ -589,11 +579,6 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("log_level: " << log_level_);
   LOG_INFO("log_path: " << log_path_.c_str());
   LOG_INFO("manager_url: " << manager_url_.c_str());
-  LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
-  LOG_INFO(
-      "enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
-      ? "true"
-      : "false");
   LOG_INFO("manager_update_interval:  minutes" << manager_update_interval_);
   LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
   LOG_INFO("max_tcp_num: " << max_proxy_num_);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index f120343f07..608e076b3b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -25,6 +25,8 @@
 #include <stdint.h>
 #include <string>
 #include <vector>
+#include <atomic>
+#include "../utils/capi_constant.h"
 
 namespace inlong {
 class SdkConfig {
@@ -40,7 +42,7 @@ private:
   void InitAuthParm(const rapidjson::Value &doc);
   void OthersParam(const rapidjson::Value &doc);
   bool GetLocalIPV4Address(std::string& err_info, std::string& localhost);
-  SdkConfig() { defaultInit(); };
+  SdkConfig():extend_report_(false) { defaultInit(); };
 
       public:
   // cache parameter
@@ -80,8 +82,6 @@ private:
 
   // manager parameters
   std::string manager_url_;
-  bool enable_manager_url_from_cluster_;
-  std::string manager_cluster_url_;
   uint32_t manager_update_interval_; // Automatic update interval, minutes
   uint32_t manager_url_timeout_;     // URL parsing timeout, seconds
   uint64_t max_proxy_num_;
@@ -114,6 +114,7 @@ private:
   uint32_t buf_size_;
 
   volatile bool parsed_ = false;
+  bool extend_report_;
 
   void defaultInit();
   static SdkConfig *getInstance();
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 1ffeeef8dc..ec64292a1e 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -50,6 +50,8 @@ RecvGroup::RecvGroup(const std::string &group_key, 
std::shared_ptr<SendManager>
   last_pack_time_ = Utils::getCurrentMsTime();
   max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
   local_ip_ = SdkConfig::getInstance()->local_ip_;
+  group_id_key_ = SdkConfig::getInstance()->extend_report_ ? "bid=" : 
"groupId=";
+  stream_id_key_ = SdkConfig::getInstance()->extend_report_ ? "&tid=" : 
"&streamId=";
   LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ 
<< ",max_recv_size:" << max_recv_size_);
 }
 
@@ -224,8 +226,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,uint32_t &
         streamId_num_ == 0) {
       groupId_num = 0;
       streamId_num = 0;
-      groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
-          "&streamId=" + msgs[0]->inlong_stream_id_;
+      groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ + 
stream_id_key_ + msgs[0]->inlong_stream_id_;
       char_groupId_flag = 0x4;
     } else {
       groupId_num = groupId_num_;
@@ -245,7 +246,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,uint32_t &
             "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
             "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
     } else {
-      attr = "groupId=" + msgs[0]->inlong_group_id_ +
+      attr = group_id_key_ + msgs[0]->inlong_group_id_ +
           "&streamId=" + msgs[0]->inlong_stream_id_;
     }
     *(uint16_t *)bodyBegin = htons(attr.size());
@@ -296,8 +297,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char 
*pack_data,uint32_t &
 
     // attr
     std::string attr;
-    attr = "groupId=" + msgs[0]->inlong_group_id_ +
-        "&streamId=" + msgs[0]->inlong_stream_id_;
+    attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + 
msgs[0]->inlong_stream_id_;
 
     attr += "&dt=" + std::to_string(data_time_);
     attr += "&mid=" + std::to_string(uniq_id_);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index 839c49e14d..64681de0bb 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -63,6 +63,9 @@ class RecvGroup {
   std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_;
   std::queue<SendBufferPtrT> fail_queue_;
 
+  std::string group_id_key_;
+  std::string stream_id_key_;
+
   void DoDispatchMsg();
   bool IsZipAndOperate(std::string& res, uint32_t real_cur_len);
   inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
index 061abc0678..c4d31ac941 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
@@ -39,6 +39,7 @@ void MetricManager::InitEnvironment() {
   environment_.setVersion(constants::kVersion);
   environment_.setPid(getpid());
   environment_.setIp(SdkConfig::getInstance()->local_ip_);
+  environment_.SetExtendReport(SdkConfig::getInstance()->extend_report_);
 }
 void MetricManager::Run() {
   prctl(PR_SET_NAME, "metric-manager");
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 1653d68de0..3db331aeda 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -18,13 +18,15 @@
  */
 
 #include "proxy_manager.h"
+
+#include "api_code.h"
+#include <fstream>
+
 #include "../config/ini_help.h"
 #include "../utils/capi_constant.h"
 #include "../utils/logger.h"
 #include "../utils/utils.h"
-#include "api_code.h"
-#include <fstream>
-#include <rapidjson/document.h>
+#include "../utils/parse_json.h"
 
 namespace inlong {
 const uint64_t MINUTE = 60000;
@@ -110,104 +112,6 @@ void ProxyManager::DoUpdate() {
   LOG_INFO("finish ProxyManager DoUpdate.");
 }
 
-int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,
-                                  const std::string &meta_data,
-                                  ProxyInfoVec &proxy_info_vec) {
-  rapidjson::Document doc;
-  if (doc.Parse(meta_data.c_str()).HasParseError()) {
-    LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":"
-                                                 << doc.GetErrorOffset());
-    return SdkCode::kErrorParseJson;
-  }
-
-  if (!(doc.HasMember("success") && doc["success"].IsBool() &&
-        doc["success"].GetBool())) {
-    LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success: not "
-              "exist or false"
-              << inlong_group_id.c_str());
-    return SdkCode::kErrorParseJson;
-  }
-  // check data valid
-  if (!doc.HasMember("data") || doc["data"].IsNull()) {
-    LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data: not exist 
"
-              "or null"
-              << inlong_group_id.c_str());
-    return SdkCode::kErrorParseJson;
-  }
-
-  // check nodelist valid
-  const rapidjson::Value &clusterInfo = doc["data"];
-  if (!clusterInfo.HasMember("nodeList") || clusterInfo["nodeList"].IsNull()) {
-    LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or null"
-              << inlong_group_id.c_str());
-    return SdkCode::kErrorParseJson;
-  }
-
-  // check nodeList isn't empty
-  const rapidjson::Value &nodeList = clusterInfo["nodeList"];
-  if (nodeList.GetArray().Size() == 0) {
-    LOG_ERROR("empty nodeList of inlong_group_id:%s"
-              << inlong_group_id.c_str());
-    return SdkCode::kErrorParseJson;
-  }
-  // check clusterId
-  if (!clusterInfo.HasMember("clusterId") ||
-      !clusterInfo["clusterId"].IsInt() ||
-      clusterInfo["clusterId"].GetInt() < 0) {
-    LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a integer"
-              << inlong_group_id.c_str());
-    return SdkCode::kErrorParseJson;
-  }
-  groupid_2_cluster_id_update_map_[inlong_group_id] =
-      clusterInfo["clusterId"].GetInt();
-
-  // check load
-  int32_t load = 0;
-  if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
-      !clusterInfo["load"].IsNull()) {
-    const rapidjson::Value &obj = clusterInfo["load"];
-    load = obj.GetInt();
-  } else {
-    load = 0;
-  }
-
-  // proxy list
-  for (auto &proxy : nodeList.GetArray()) {
-    std::string ip;
-    std::string id;
-    int32_t port;
-    if (proxy.HasMember("ip") && !proxy["ip"].IsNull())
-      ip = proxy["ip"].GetString();
-    else {
-      LOG_ERROR("this ip info is null");
-      continue;
-    }
-    if (proxy.HasMember("port") && !proxy["port"].IsNull()) {
-      if (proxy["port"].IsString())
-        port = std::stoi(proxy["port"].GetString());
-      else if (proxy["port"].IsInt())
-        port = proxy["port"].GetInt();
-    }
-
-    else {
-      LOG_ERROR("this ip info is null or negative");
-      continue;
-    }
-    if (proxy.HasMember("id") && !proxy["id"].IsNull()) {
-      if (proxy["id"].IsString())
-        id = proxy["id"].GetString();
-      else if (proxy["id"].IsInt())
-        id = proxy["id"].GetInt();
-    } else {
-      LOG_WARN("there is no id info of inlong_group_id");
-      continue;
-    }
-    proxy_info_vec.emplace_back(id, ip, port, load);
-  }
-
-  return SdkCode::kSuccess;
-}
-
 int32_t ProxyManager::GetProxy(const std::string &key,
                                ProxyInfoVec &proxy_info_vec) {
   if (constants::IsolationLevel::kLevelOne ==
@@ -407,16 +311,15 @@ void ProxyManager::UpdateProxy(
       LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
       continue;
     }
-    std::string url;
-    if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
-      url = SdkConfig::getInstance()->manager_cluster_url_;
-    else {
-      url =
-          SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
+    std::string url = SdkConfig::getInstance()->manager_url_ + "/" + 
groupid2cluster.first;
+    if (SdkConfig::getInstance()->extend_report_) {
+      url = SdkConfig::getInstance()->manager_url_ + "?bid=" + 
groupid2cluster.first + "&net_tag=all&ip=" +
+          SdkConfig::getInstance()->local_ip_;
     }
+
     std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
-                            "&version=" + constants::kVersion +
-                            "&protocolType=" + constants::kProtocolType;
+        "&version=" + constants::kVersion +
+        "&protocolType=" + constants::kProtocolType;
     LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
                                     << "proxy cfg url " << url.c_str()
                                     << "post_data:" << post_data.c_str());
@@ -441,7 +344,7 @@ void ProxyManager::UpdateProxy(
       if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
           groupid_2_proxy_map_.end()) {
         LOG_WARN("failed to request from manager, use previous "
-                 << groupid2cluster.first);
+                     << groupid2cluster.first);
         continue;
       }
       if (!SdkConfig::getInstance()->enable_local_cache_) {
@@ -456,10 +359,14 @@ void ProxyManager::UpdateProxy(
     }
 
     ProxyInfoVec proxyInfoVec;
-    ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
+    if (SdkConfig::getInstance()->extend_report_) {
+      ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, 
groupid_2_cluster_id_update_map_, proxyInfoVec);
+    } else {
+      ret = ParseJson::ParseProxyInfo(groupid2cluster.first, meta_data, 
proxyInfoVec, groupid_2_cluster_id_update_map_);
+    }
+
     if (ret != SdkCode::kSuccess) {
-      LOG_ERROR("failed to parse groupid:%s json proxy list "
-                << groupid2cluster.first.c_str());
+      LOG_ERROR("Failed to parse json: " << meta_data);
       continue;
     }
     if (!proxyInfoVec.empty()) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 419bc60798..2dec2f572e 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -27,6 +27,7 @@
 #include <vector>
 
 namespace inlong {
+using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>;
 class ProxyManager {
 private:
   uint32_t timeout_;
@@ -53,7 +54,7 @@ private:
   uint64_t last_update_time_;
 
   int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
-                      ProxyInfoVec &proxy_info_vec);
+                      ProxyInfoVec &proxy_info_vec,GroupId2ClusterIdMap 
&group_id_2_cluster_id);
 
 public:
   ProxyManager(){};
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 0af18f7ccb..fe830c4cb0 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -64,7 +64,7 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string 
&send_group_key) {
   unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_);
   auto send_group_map = send_group_map_.find(send_group_key);
   if (send_group_map == send_group_map_.end()) {
-    LOG_ERROR("fail to get send group, group_id:%s" << send_group_key);
+    LOG_ERROR("Fail to get send group, group key:" << send_group_key);
     return nullptr;
   }
   if (send_group_map->second.empty()) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
index e3d4a9bcc8..8d64faadc7 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
@@ -27,6 +27,7 @@ class Environment {
   std::string version_;
   std::string ip_;
   uint64_t pid_;
+  bool extend_report_;
   const std::string &getType() const { return type_; }
   void setType(const std::string &type) { type_ = type; }
   std::string getVersion() { return version_; }
@@ -35,12 +36,14 @@ class Environment {
   void setIp(const std::string &ip) { ip_ = ip; }
   uint64_t getPid() const { return pid_; }
   void setPid(uint64_t pid) { pid_ = pid; }
+  void SetExtendReport(bool extend_report) {extend_report_ = extend_report;}
 
   std::string ToString() const {
     std::stringstream metric;
     metric << "local ip[" << ip_ << "] ";
-    metric << "version [" << version_ << "] ";
-    metric << "pid [" << pid_ << "] ";
+    metric << "version[" << version_ << "] ";
+    metric << "pid[" << pid_ << "] ";
+    metric << "extend report[" << extend_report_ << "]";
     return metric.str();
   }
 };
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
index 87bd991b28..1dcbc33afa 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
@@ -130,7 +130,7 @@ class Metric {
     metric << "msg[" << send_success_msg_num_ << "] ";
     metric << "failed-pack[" << send_failed_pack_num_ << "] ";
     metric << "msg[" << send_failed_msg_num_ << "] ";
-    metric << "trans[" << getTransTime() << "] ";
+    metric << "trans[" << getTransTime() << "]";
     return metric.str();
   }
   std::string ToString() const {
@@ -142,7 +142,7 @@ class Metric {
     metric << "trans[" << getTransTime() << "] ";
     metric << "buffer full[" << receive_buffer_full_count_ << "] ";
     metric << "too long msg[" << too_long_msg_count_ << "] ";
-    metric << "metadata fail[" << metadata_fail_count_ << "] ";
+    metric << "metadata fail[" << metadata_fail_count_ << "]";
     return metric.str();
   }
 };
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index a1753b8255..f97cacf484 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -93,6 +93,7 @@ static const bool kEnableSetAffinity = false;
 static const uint32_t kMaskCPUAffinity = 0xff;
 static const uint16_t kExtendField = 0;
 static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max();
+static const bool kExtendReport = false;
 
 // http basic auth
 static const char kBasicAuthHeader[] = "Authorization:";
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc
new file mode 100644
index 0000000000..b1e4ebff7e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "parse_json.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/document.h"
+#include "../core/api_code.h"
+#include "../utils/logger.h"
+
+namespace inlong {
+static const char kJsonKeySuccess[] = "success";
+static const char kJsonKeyData[] = "data";
+static const char kJsonKeyNodeList[] = "nodeList";
+static const char kJsonKeyClusterId[] = "clusterId";
+static const char kJsonKeyLoad[] = "load";
+static const char kJsonKeyIp[] = "ip";
+static const char kJsonKeyPort[] = "port";
+static const char kJsonKeyId[] = "id";
+static const char kJsonKeySize[] = "size";
+static const char kJsonKeyClusterIdV2[] = "cluster_id";
+static const char kJsonKeyAddress[] = "address";
+static const char kJsonKeyHost[] = "host";
+
+int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id,
+                                  const std::string &meta_data,
+                                  ProxyInfoVec &proxy_info_vec,
+                                  GroupId2ClusterIdMap &group_id_2_cluster_id) 
{
+  rapidjson::Document doc;
+  if (doc.Parse(meta_data.c_str()).HasParseError()) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  if (!(doc.HasMember(kJsonKeySuccess) && doc[kJsonKeySuccess].IsBool() &&
+      doc[kJsonKeySuccess].GetBool())) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  if (!doc.HasMember(kJsonKeyData) || doc[kJsonKeyData].IsNull()) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  const rapidjson::Value &clusterInfo = doc[kJsonKeyData];
+  if (!clusterInfo.HasMember(kJsonKeyNodeList) || 
clusterInfo[kJsonKeyNodeList].IsNull()) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  const rapidjson::Value &nodeList = clusterInfo[kJsonKeyNodeList];
+  if (nodeList.GetArray().Size() <= 0) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  if (!clusterInfo.HasMember(kJsonKeyClusterId) ||
+      !clusterInfo[kJsonKeyClusterId].IsInt() ||
+      clusterInfo[kJsonKeyClusterId].GetInt() <= 0) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  group_id_2_cluster_id[inlong_group_id] = 
clusterInfo[kJsonKeyClusterId].GetInt();
+
+  int32_t load = 0;
+  if (clusterInfo.HasMember(kJsonKeyLoad) && clusterInfo[kJsonKeyLoad].IsInt() 
&&
+      !clusterInfo[kJsonKeyLoad].IsNull()) {
+    load = clusterInfo[kJsonKeyLoad].GetInt();
+  }
+
+  for (auto &proxy : nodeList.GetArray()) {
+    std::string ip;
+    std::string id;
+    int32_t port;
+    if (!proxy.HasMember(kJsonKeyIp) || proxy[kJsonKeyIp].IsNull()) {
+      continue;
+    }
+    ip = proxy[kJsonKeyIp].GetString();
+
+    if (!proxy.HasMember(kJsonKeyPort) || proxy[kJsonKeyPort].IsNull()) {
+      continue;
+    }
+    if (proxy[kJsonKeyPort].IsString()) port = 
std::stoi(proxy[kJsonKeyPort].GetString());
+    if (proxy[kJsonKeyPort].IsInt()) port = proxy[kJsonKeyPort].GetInt();
+
+    if (!proxy.HasMember(kJsonKeyId) || proxy[kJsonKeyId].IsNull()) {
+      continue;
+    }
+    if (proxy[kJsonKeyId].IsString()) id = proxy[kJsonKeyId].GetString();
+    if (proxy[kJsonKeyId].IsInt()) id = proxy[kJsonKeyId].GetInt();
+
+    proxy_info_vec.emplace_back(id, ip, port, load);
+  }
+  return SdkCode::kSuccess;
+}
+int32_t ParseJson::ParseProxyInfo(const std::string &inlong_group_id, const 
std::string &meta_data,
+                                  GroupId2ClusterIdMap &group_id_2_cluster_id, 
ProxyInfoVec &proxy_info_vec) {
+  rapidjson::Document doc;
+  if (doc.Parse(meta_data.c_str()).HasParseError()) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  if (!doc.HasMember(kJsonKeySize) || !doc[kJsonKeySize].IsInt() || 
doc[kJsonKeySize].IsNull()
+      || doc[kJsonKeySize].GetInt() <= 0) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  if (doc.HasMember(kJsonKeyClusterIdV2) && doc[kJsonKeyClusterIdV2].IsInt() 
&& !doc[kJsonKeyClusterIdV2].IsNull()) {
+    const rapidjson::Value &obj = doc[kJsonKeyClusterIdV2];
+    group_id_2_cluster_id[inlong_group_id] = obj.GetInt();
+  } else {
+    return SdkCode::kErrorParseJson;
+  }
+
+  int32_t load = 0;
+  if (doc.HasMember(kJsonKeyLoad) && doc[kJsonKeyLoad].IsInt() && 
!doc[kJsonKeyLoad].IsNull()) {
+    load = doc[kJsonKeyLoad].GetInt();
+  }
+
+  if (!doc.HasMember(kJsonKeyAddress) || doc[kJsonKeyAddress].IsNull()) {
+    return SdkCode::kErrorParseJson;
+  }
+
+  const rapidjson::Value &host_list = doc[kJsonKeyAddress];
+  for (auto &info : host_list.GetArray()) {
+    std::string id, ip;
+    if (!info.HasMember(kJsonKeyHost) || info[kJsonKeyHost].IsNull()) {
+      continue;
+    }
+    ip = info[kJsonKeyHost].GetString();
+
+    if (!info.HasMember(kJsonKeyPort) || info[kJsonKeyPort].IsNull()) {
+      continue;
+    }
+
+    int32_t port;
+    if (info[kJsonKeyPort].IsString()) port = 
std::stoi(info[kJsonKeyPort].GetString());
+    if (info[kJsonKeyPort].IsInt()) port = info[kJsonKeyPort].GetInt();
+
+    if (!info.HasMember(kJsonKeyId) || info[kJsonKeyId].IsNull()) {
+      continue;
+    }
+    if (info[kJsonKeyId].IsString()) id = info[kJsonKeyId].GetString();
+    if (info[kJsonKeyId].IsInt()) id = 
std::to_string(info[kJsonKeyId].GetInt());
+
+    proxy_info_vec.emplace_back(id, ip, port, load);
+  }
+  return SdkCode::kSuccess;
+}
+}
+
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h
new file mode 100644
index 0000000000..01abdbdc6e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/parse_json.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef 
DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H
+#define 
DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H
+
+#include <string>
+#include <unordered_map>
+#include "../config/proxy_info.h"
+
+namespace inlong {
+using GroupId2ClusterIdMap = std::unordered_map<std::string, int32_t>;
+class ParseJson {
+ public:
+  static int32_t ParseProxyInfo(const std::string &inlong_group_id, const 
std::string &meta_data,
+                                ProxyInfoVec &proxy_info_vec, 
GroupId2ClusterIdMap &group_id_2_cluster_id);
+  static int32_t ParseProxyInfo(const std::string &inlong_group_id, const 
std::string &meta_data,
+                                GroupId2ClusterIdMap &group_id_2_cluster_id, 
ProxyInfoVec &proxy_info_vec);
+};
+}
+#endif 
//DATAPROXYSDK_INLONG_SDK_DATAPROXY_SDK_TWINS_DATAPROXY_SDK_CPP_SRC_UTILS_PARSE_JSON_H

Reply via email to