This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 47c710b28c [INLONG-10793][SDK] Added metric management for DataProxy 
CPP SDK (#10815)
47c710b28c is described below

commit 47c710b28ce044f9a6e695273c55ea20b72d574e
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Mon Aug 19 20:10:18 2024 +0800

    [INLONG-10793][SDK] Added metric management for DataProxy CPP SDK (#10815)
---
 .../dataproxy-sdk-cpp/CMakeLists.txt               |   3 +-
 .../dataproxy-sdk-cpp/src/core/api_imp.cc          |   4 +-
 .../src/manager/metric_manager.cc                  |  64 +++++++++
 .../dataproxy-sdk-cpp/src/manager/metric_manager.h |  93 +++++++++++++
 .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc |   1 +
 .../dataproxy-sdk-cpp/src/metric/environment.h     |  48 +++++++
 .../dataproxy-sdk-cpp/src/metric/metric.h          | 151 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/utils/CMakeLists.txt     |   7 +-
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h    |   1 +
 9 files changed, 368 insertions(+), 4 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
index ccbab35336..2a3183fff3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
@@ -33,6 +33,7 @@ include_directories(src/manager)
 include_directories(src/group)
 include_directories(src/protocol)
 include_directories(src/client)
+include_directories(src/metric)
 
 link_directories(${PROJECT_SOURCE_DIR}/third_party/lib)
 link_directories(${PROJECT_SOURCE_DIR}/third_party/lib64)
@@ -57,7 +58,7 @@ aux_source_directory(src/protocol PROTOCOL)
 aux_source_directory(src/client CLIENT)
 
 # static library
-add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} 
${GROUP} ${PROTOCOL} ${CLIENT})
+add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} 
${GROUP} ${PROTOCOL} ${CLIENT} ${METRIC})
 
 set_target_properties(dataproxy_sdk PROPERTIES OUTPUT_NAME "dataproxy_sdk" 
PREFIX "")
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index c4b493b068..13ce7223b5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -25,6 +25,8 @@
 #include <iostream>
 #include <signal.h>
 
+#include "metric_manager.h"
+
 namespace inlong {
 int32_t ApiImp::InitApi(const char *config_file_path) {
   if (!__sync_bool_compare_and_swap(&inited_, false, true)) {
@@ -104,7 +106,7 @@ int32_t ApiImp::DoInit() {
   LOG_INFO("inlong dataproxy cpp sdk Init complete!");
 
   ProxyManager::GetInstance()->Init();
-  ProxyManager::GetInstance()->ReadLocalCache();
+  MetricManager::GetInstance()->Init();
 
   for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) 
{
     LOG_INFO("DoInit CheckConf inlong_group_id:"
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
new file mode 100644
index 0000000000..061abc0678
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "metric_manager.h"
+
+#include <rapidjson/document.h>
+#include <sys/prctl.h>
+#include <unistd.h>
+
+#include "../utils/logger.h"
+#include "../utils/utils.h"
+#include "../utils/capi_constant.h"
+
+namespace inlong {
+void MetricManager::Init() {
+  if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+    update_thread_ = std::thread(&MetricManager::Run, this);
+  }
+  InitEnvironment();
+}
+void MetricManager::InitEnvironment() {
+  environment_.setType("cpp");
+  environment_.setVersion(constants::kVersion);
+  environment_.setPid(getpid());
+  environment_.setIp(SdkConfig::getInstance()->local_ip_);
+}
+void MetricManager::Run() {
+  prctl(PR_SET_NAME, "metric-manager");
+  while (running_) {
+    LOG_INFO("Start report metric");
+    PrintMetric();
+    
std::this_thread::sleep_for(std::chrono::minutes(constants::kMetricIntervalMinutes));
+  }
+}
+void MetricManager::PrintMetric() {
+  std::unordered_map<std::string, Metric> stat_map;
+  {
+    std::lock_guard<std::mutex> lck(mutex_);
+    stat_map.swap(stat_map_);
+  }
+
+  LOG_INFO("[MetricManager] Environment info: " << environment_.ToString());
+
+  for (auto it : stat_map) {
+    LOG_INFO("[MetricManager] Metric info: " << it.first << " " << 
it.second.ToString());
+  }
+}
+}  // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
new file mode 100644
index 0000000000..5dc013f2d5
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <queue>
+#include <thread>
+#include <unordered_map>
+
+#include "../config/sdk_conf.h"
+#include "../metric/environment.h"
+#include "../metric/metric.h"
+
+#ifndef INLONG_METRIC_MANAGER_H
+#define INLONG_METRIC_MANAGER_H
+namespace inlong {
+using MetricMap = std::unordered_map<std::string, Metric>;
+static const char kStatJoiner = ' ';
+class MetricManager {
+ private:
+  mutable std::mutex mutex_;
+  MetricMap stat_map_;
+  std::thread update_thread_;
+  volatile bool inited_ = false;
+  bool running_ = true;
+  Environment environment_;
+  std::string coreParma_;
+
+  MetricManager() {
+
+  }
+
+ public:
+  static MetricManager *GetInstance() {
+    static MetricManager instance;
+    return &instance;
+  }
+  void Init();
+  void InitEnvironment();
+  void PrintMetric();
+  void Run();
+  void UpdateMetric(const std::string &stat_key, Metric &stat) {
+    std::lock_guard<std::mutex> lck(mutex_);
+    stat_map_[stat_key].Update(stat);
+  }
+
+  void AddReceiveBufferFullCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    std::lock_guard<std::mutex> lck(mutex_);
+    std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+    stat_map_[stat_key].AddReceiveBufferFullCount(count);
+  }
+
+  void AddTooLongMsgCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    std::lock_guard<std::mutex> lck(mutex_);
+    std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+    stat_map_[stat_key].AddTooLongMsgCount(count);
+  }
+
+  void AddMetadataFailCount(const std::string &inlong_group_id, const 
std::string &inlong_stream_id,uint64_t count) {
+    std::lock_guard<std::mutex> lck(mutex_);
+    std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+    stat_map_[stat_key].AddMetadataFailCount(count);
+  }
+
+  void Reset();
+
+  std::string BuildStatKey(const std::string &inlong_group_id, const 
std::string &inlong_stream_id) {
+    return inlong_group_id + kStatJoiner + inlong_stream_id;
+  }
+
+  ~MetricManager() {
+    running_ = false;
+    if (update_thread_.joinable()) {
+      update_thread_.join();
+    }
+  }
+};
+}  // namespace inlong
+#endif  // INLONG_METRIC_MANAGER_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index e4aef239e4..09014d728d 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -44,6 +44,7 @@ void ProxyManager::Init() {
   timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
   last_update_time_ = Utils::getCurrentMsTime();
   if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+    ReadLocalCache();
     update_conf_thread_ = std::thread(&ProxyManager::Update, this);
   }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
new file mode 100644
index 0000000000..e3d4a9bcc8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_ENVIRONMENT_H
+#define INLONG_ENVIRONMENT_H
+
+#include <string>
+#include <sstream>
+namespace inlong {
+class Environment {
+ public:
+  std::string type_;
+  std::string version_;
+  std::string ip_;
+  uint64_t pid_;
+  const std::string &getType() const { return type_; }
+  void setType(const std::string &type) { type_ = type; }
+  std::string getVersion() { return version_; }
+  void setVersion(const std::string &version) { version_ = version; }
+  const std::string &getIp() const { return ip_; }
+  void setIp(const std::string &ip) { ip_ = ip; }
+  uint64_t getPid() const { return pid_; }
+  void setPid(uint64_t pid) { pid_ = pid; }
+
+  std::string ToString() const {
+    std::stringstream metric;
+    metric << "local ip[" << ip_ << "] ";
+    metric << "version [" << version_ << "] ";
+    metric << "pid [" << pid_ << "] ";
+    return metric.str();
+  }
+};
+} // namespace inlong
+#endif  // INLONG_ENVIRONMENT_H
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
new file mode 100644
index 0000000000..87bd991b28
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_METRIC_H
+#define INLONG_METRIC_H
+
+#include <cstdint>
+#include <sstream>
+namespace inlong {
+class Metric {
+ private:
+  uint64_t send_success_pack_num_;
+  uint64_t send_success_msg_num_;
+  uint64_t send_failed_pack_num_;
+  uint64_t send_failed_msg_num_;
+
+  uint64_t time_cost_;
+  uint64_t time_cost_0t32_;
+  uint64_t time_cost_32t128_;
+  uint64_t time_cost_128t1024_;
+  uint64_t time_cost_1024t65536_;
+
+  uint64_t receive_buffer_full_count_;
+  uint64_t too_long_msg_count_;
+  uint64_t metadata_fail_count_;
+
+ public:
+  Metric()
+      : send_success_pack_num_(0),
+        send_success_msg_num_(0),
+        send_failed_pack_num_(0),
+        send_failed_msg_num_(0),
+        time_cost_(0),
+        time_cost_0t32_(0),
+        time_cost_32t128_(0),
+        time_cost_128t1024_(0),
+        time_cost_1024t65536_(0),
+        receive_buffer_full_count_(0),
+        too_long_msg_count_(0),
+        metadata_fail_count_(0) {}
+
+  void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; }
+  void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; }
+  void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; }
+  void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; }
+  void AddReceiveBufferFullCount(uint64_t receive_buffer_full_count) {
+    receive_buffer_full_count_ += receive_buffer_full_count;
+  }
+  void AddTooLongMsgCount(uint64_t too_long_msg_count) { too_long_msg_count_ 
+= too_long_msg_count; }
+  void AddMetadataFailCount(uint64_t metadata_fail_count) { 
metadata_fail_count_ += metadata_fail_count; }
+
+  uint64_t GetSendSuccessPackNum() { return send_success_pack_num_; }
+  uint64_t GetSendSuccessMsgNum() { return send_success_msg_num_; }
+  uint64_t GetSendFailedPackNum() { return send_failed_pack_num_; }
+  uint64_t GetSendFailedMsgNum() { return send_failed_msg_num_; }
+  uint64_t GetTimeCost() { return time_cost_; }
+  uint64_t GetTimeCost0T32() const { return time_cost_0t32_; }
+  uint64_t GetTimeCost32T128() const { return time_cost_32t128_; }
+  uint64_t GetTimeCost128T1024() const { return time_cost_128t1024_; }
+  uint64_t GetTimeCost1024T65536() const { return time_cost_1024t65536_; }
+  uint64_t GetReceiveBufferFullCount() const { return 
receive_buffer_full_count_; }
+  uint64_t GetTooLongMsgCount() const { return too_long_msg_count_; }
+  uint64_t GetMetadataFailCount() const { return metadata_fail_count_; }
+
+  void AddTimeCost(uint64_t time_cost) {
+    time_cost_ += time_cost;
+    if (time_cost < 32) {
+      time_cost_0t32_++;
+      return;
+    } else if (time_cost < 128) {
+      time_cost_32t128_++;
+      return;
+    } else if (time_cost < 1024) {
+      time_cost_128t1024_++;
+      return;
+    } else {
+      time_cost_1024t65536_++;
+    }
+  }
+
+  void ResetStat() {
+    send_success_pack_num_ = 0;
+    send_success_msg_num_ = 0;
+    send_failed_pack_num_ = 0;
+    send_failed_msg_num_ = 0;
+    time_cost_ = 0;
+    time_cost_0t32_ = 0;
+    time_cost_32t128_ = 0;
+    time_cost_128t1024_ = 0;
+    time_cost_1024t65536_ = 0;
+    receive_buffer_full_count_ = 0;
+    too_long_msg_count_ = 0;
+    metadata_fail_count_ = 0;
+  }
+
+  void Update(Metric stat) {
+    send_success_pack_num_ += stat.send_success_pack_num_;
+    send_success_msg_num_ += stat.send_success_msg_num_;
+    send_failed_pack_num_ += stat.send_failed_pack_num_;
+    send_failed_msg_num_ += stat.send_failed_msg_num_;
+    time_cost_ += stat.time_cost_;
+    time_cost_0t32_ += stat.time_cost_0t32_;
+    time_cost_32t128_ += stat.time_cost_32t128_;
+    time_cost_128t1024_ += stat.time_cost_128t1024_;
+    time_cost_1024t65536_ += stat.time_cost_1024t65536_;
+  }
+
+  uint64_t getTransTime() const {
+    uint64_t pack_num = send_success_pack_num_ + send_failed_pack_num_ + 1;
+    return time_cost_ / pack_num;
+  }
+
+  std::string GetSendMetricInfo() const {
+    std::stringstream metric;
+    metric << "success-pack[" << send_success_pack_num_ << "] ";
+    metric << "msg[" << send_success_msg_num_ << "] ";
+    metric << "failed-pack[" << send_failed_pack_num_ << "] ";
+    metric << "msg[" << send_failed_msg_num_ << "] ";
+    metric << "trans[" << getTransTime() << "] ";
+    return metric.str();
+  }
+  std::string ToString() const {
+    std::stringstream metric;
+    metric << "success-pack[" << send_success_pack_num_ << "] ";
+    metric << "msg[" << send_success_msg_num_ << "] ";
+    metric << "failed-pack[" << send_failed_pack_num_ << "] ";
+    metric << "msg[" << send_failed_msg_num_ << "] ";
+    metric << "trans[" << getTransTime() << "] ";
+    metric << "buffer full[" << receive_buffer_full_count_ << "] ";
+    metric << "too long msg[" << too_long_msg_count_ << "] ";
+    metric << "metadata fail[" << metadata_fail_count_ << "] ";
+    return metric.str();
+  }
+};
+}  // namespace inlong
+
+#endif  // INLONG_METRIC_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
index 6ecf82464b..a95af7462f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
@@ -19,6 +19,9 @@
 
 cmake_minimum_required(VERSION 3.1)
 
-aux_source_directory(. UTILS_SRCS)
-
+file(GLOB UTILS_SRCS
+        "*.cc"
+        "*.h"
+        )
 add_library(utils STATIC ${UTILS_SRCS})
+
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 399bd1b348..dbac24f6a6 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -108,6 +108,7 @@ static const int32_t kWeight[30] = {1,  1,  1,  1,  1,  2,  
2,  2,   2,   2,
 static const char kCacheFile[] = ".proxy_list.ini";
 static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
 const int MAX_RETRY = 10;
+static const int kMetricIntervalMinutes = 1;
 
 } // namespace constants
 } // namespace inlong

Reply via email to