This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new c807df39f9 [INLONG-11320][Audit] Add a metric monitoring system for 
the Audit Proxy itself (#11359)
c807df39f9 is described below

commit c807df39f92ac57423f1f7a12cb86bd1a7c29e96
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Tue Oct 15 17:34:46 2024 +0800

    [INLONG-11320][Audit] Add a metric monitoring system for the Audit Proxy 
itself (#11359)
---
 .../apache/inlong/audit/file/ConfigManager.java    | 25 +++++-
 .../apache/inlong/audit/metric/AbstractMetric.java | 23 +++++
 .../inlong/audit/config/ConfigConstants.java       | 31 +++++++
 .../inlong/audit/metric/MetricDimension.java       | 41 +++++++++
 .../org/apache/inlong/audit/metric/MetricItem.java | 44 ++++++++++
 .../apache/inlong/audit/metric/MetricsManager.java | 98 ++++++++++++++++++++++
 .../metric/prometheus/ProxyPrometheusMetric.java   | 83 ++++++++++++++++++
 .../org/apache/inlong/audit/node/Application.java  |  8 +-
 .../org/apache/inlong/audit/sink/KafkaSink.java    |  5 ++
 .../org/apache/inlong/audit/sink/PulsarSink.java   |  3 +
 .../org/apache/inlong/audit/sink/TubeSink.java     |  5 ++
 .../inlong/audit/source/ServerMessageHandler.java  | 14 ++++
 inlong-audit/conf/application.properties           |  8 +-
 13 files changed, 382 insertions(+), 6 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 68745efd23..62a2f551ca 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 public class ConfigManager {
 
@@ -94,12 +95,28 @@ public class ConfigManager {
         return null;
     }
 
-    public String getValue(String key) {
+    public <T> T getValue(String key, T defaultValue, Function<String, T> 
parser) {
         ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
-        if (holder != null) {
-            return holder.getHolder().get(key);
+        if (holder == null) {
+            return defaultValue;
         }
-        return null;
+        Object value = holder.getHolder().get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            return parser.apply((String) value);
+        } catch (Exception e) {
+            return defaultValue;
+        }
+    }
+
+    public String getValue(String key, String defaultValue) {
+        return getValue(key, defaultValue, Function.identity());
+    }
+
+    public int getValue(String key, int defaultValue) {
+        return getValue(key, defaultValue, Integer::parseInt);
     }
 
     private boolean updatePropertiesHolder(Map<String, String> result,
diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
new file mode 100644
index 0000000000..4c2f627916
--- /dev/null
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+public interface AbstractMetric {
+
+    public void report();
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
new file mode 100644
index 0000000000..67d5183ce5
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Config constants
+ */
+public class ConfigConstants {
+
+    public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy";
+    public static final String KEY_PROMETHEUS_PORT = 
"audit.proxy.prometheus.port";
+    public static final int DEFAULT_PROMETHEUS_PORT = 10082;
+    public static final String KEY_PROXY_METRIC_CLASSNAME = 
"audit.proxy.metric.classname";
+    public static final String DEFAULT_PROXY_METRIC_CLASSNAME =
+            "org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric";
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
new file mode 100644
index 0000000000..3f410330c0
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+public enum MetricDimension {
+
+    RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
+    RECEIVE_PACK_SUCCESS("receivePackSuccess"),
+    RECEIVE_SIZE_SUCCESS("receiveSizeSuccess"),
+    RECEIVE_COUNT_INVALID("receiveCountInvalid"),
+    RECEIVE_COUNT_EXPIRED("receiveCountExpired"),
+    SEND_COUNT_SUCCESS("sendCountSuccess"),
+    SEND_COUNT_FAILED("sendCountFailed"),
+    SEND_PACK_SUCCESS("sendPackSuccess"),
+    SEND_DURATION("sendDuration");
+
+    private final String key;
+
+    MetricDimension(String key) {
+        this.key = key;
+    }
+
+    public String getKey() {
+        return key;
+    }
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
new file mode 100644
index 0000000000..a95f4e3484
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+import lombok.Data;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+@Data
+public class MetricItem {
+
+    public static final String K_DIMENSION_KEY = "dimensionName";
+    private AtomicLong receiveCountSuccess = new AtomicLong(0);
+    private AtomicLong receivePackSuccess = new AtomicLong(0);
+    private AtomicLong receiveSizeSuccess = new AtomicLong(0);
+    private AtomicLong receiveCountInvalid = new AtomicLong(0);
+    private AtomicLong receiveCountExpired = new AtomicLong(0);
+    private AtomicLong sendCountSuccess = new AtomicLong(0);
+    private AtomicLong sendCountFailed = new AtomicLong(0);
+    public void resetAllMetrics() {
+        receiveCountSuccess.set(0);
+        receivePackSuccess.set(0);
+        receiveSizeSuccess.set(0);
+        receiveCountInvalid.set(0);
+        receiveCountExpired.set(0);
+        sendCountSuccess.set(0);
+        sendCountFailed.set(0);
+    }
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
new file mode 100644
index 0000000000..433fc71848
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+import org.apache.inlong.audit.file.ConfigManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
+import static 
org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;
+
+public class MetricsManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MetricsManager.class);
+
+    private static class Holder {
+
+        private static final MetricsManager INSTANCE = new MetricsManager();
+    }
+
+    private AbstractMetric metric;
+
+    public void init(String metricName) {
+        try {
+            ConfigManager configManager = ConfigManager.getInstance();
+            String metricClassName = 
configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, 
DEFAULT_PROXY_METRIC_CLASSNAME);
+            LOGGER.info("Metric class name: {}", metricClassName);
+            Constructor<?> constructor = Class.forName(metricClassName)
+                    .getDeclaredConstructor(String.class, MetricItem.class, 
int.class);
+            constructor.setAccessible(true);
+            metric = (AbstractMetric) constructor.newInstance(metricName, 
metricItem,
+                    configManager.getValue(KEY_PROMETHEUS_PORT, 
DEFAULT_PROMETHEUS_PORT));
+
+            timer.scheduleWithFixedDelay(() -> {
+                metric.report();
+                metricItem.resetAllMetrics();
+            }, 0, 1, TimeUnit.MINUTES);
+        } catch (ClassNotFoundException | NoSuchMethodException | 
InstantiationException | IllegalAccessException
+                | InvocationTargetException exception) {
+            LOGGER.error("Init metrics manager has exception: ", exception);
+        }
+    }
+
+    public static MetricsManager getInstance() {
+        return Holder.INSTANCE;
+    }
+
+    private final MetricItem metricItem = new MetricItem();
+    protected final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
+
+    public void addReceiveCountInvalid(long count) {
+        metricItem.getReceiveCountInvalid().addAndGet(count);
+    }
+
+    public void addReceiveCountExpired(long count) {
+        metricItem.getReceiveCountExpired().addAndGet(count);
+    }
+
+    public void addReceiveSuccess(long count, long pack, long size) {
+        metricItem.getReceiveCountSuccess().addAndGet(count);
+        metricItem.getReceivePackSuccess().addAndGet(pack);
+        metricItem.getReceiveSizeSuccess().addAndGet(size);
+    }
+
+    public void addSendSuccess(long count) {
+        metricItem.getSendCountSuccess().addAndGet(count);
+    }
+    public void addSendFailed(long count) {
+        metricItem.getSendCountFailed().addAndGet(count);
+    }
+    public void shutdown() {
+        timer.shutdown();
+    }
+}
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
new file mode 100644
index 0000000000..07c2397743
--- /dev/null
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric.prometheus;
+
+import org.apache.inlong.audit.metric.AbstractMetric;
+import org.apache.inlong.audit.metric.MetricDimension;
+import org.apache.inlong.audit.metric.MetricItem;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.exporter.HTTPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * PrometheusMetric
+ */
+public class ProxyPrometheusMetric extends Collector implements AbstractMetric 
{
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyPrometheusMetric.class);
+    private static final String HELP_DESCRIPTION = "help";
+
+    private final MetricItem metricItem;
+    private final String metricName;
+    private HTTPServer server;
+
+    public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int 
prometheusPort) {
+        this.metricName = metricName;
+        this.metricItem = metricItem;
+        try {
+            server = new HTTPServer(prometheusPort);
+            this.register();
+        } catch (IOException e) {
+            LOGGER.error("Construct proxy prometheus metric has IOException", 
e);
+        }
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples.Sample> samples = Arrays.asList(
+                createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, 
metricItem.getReceiveCountSuccess().doubleValue()),
+                createSample(MetricDimension.RECEIVE_PACK_SUCCESS, 
metricItem.getReceivePackSuccess().doubleValue()),
+                createSample(MetricDimension.RECEIVE_SIZE_SUCCESS, 
metricItem.getReceiveSizeSuccess().doubleValue()),
+                createSample(MetricDimension.RECEIVE_COUNT_INVALID, 
metricItem.getReceiveCountInvalid().doubleValue()),
+                createSample(MetricDimension.RECEIVE_COUNT_EXPIRED, 
metricItem.getReceiveCountExpired().doubleValue()),
+                createSample(MetricDimension.SEND_COUNT_SUCCESS, 
metricItem.getSendCountSuccess().doubleValue()),
+                createSample(MetricDimension.SEND_COUNT_FAILED, 
metricItem.getSendCountFailed().doubleValue()));
+
+        MetricFamilySamples metricFamilySamples =
+                new MetricFamilySamples(metricName, Type.GAUGE, 
HELP_DESCRIPTION, samples);
+
+        return Collections.singletonList(metricFamilySamples);
+    }
+
+    private MetricFamilySamples.Sample createSample(MetricDimension key, 
double value) {
+        return new MetricFamilySamples.Sample(metricName, 
Collections.singletonList(MetricItem.K_DIMENSION_KEY),
+                Collections.singletonList(key.getKey()), value);
+    }
+
+    @Override
+    public void report() {
+        LOGGER.info("Report proxy prometheus metric: {} ", 
metricItem.toString());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index 4ab7f29e51..1aa7f6b7d0 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.audit.node;
 
 import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -58,8 +59,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static 
org.apache.inlong.audit.config.ConfigConstants.AUDIT_PROXY_SERVER_NAME;
+
 /**
- * 
  * Application
  */
 public class Application {
@@ -259,6 +261,7 @@ public class Application {
 
     /**
      * main
+     *
      * @param args
      */
     public static void main(String[] args) {
@@ -344,9 +347,12 @@ public class Application {
                 @Override
                 public void run() {
                     appReference.stop();
+                    MetricsManager.getInstance().shutdown();
                 }
             });
 
+            MetricsManager.getInstance().init(AUDIT_PROXY_SERVER_NAME);
+
         } catch (Exception e) {
             logger.error("A fatal error occurred while running. Exception 
follows.", e);
         }
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index dc2c7c154d..db2a63c46d 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.audit.sink;
 
 import org.apache.inlong.audit.base.HighPriorityThreadFactory;
 import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
 import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -385,6 +386,8 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     }
 
     public void handleMessageSendSuccess(EventStat es) {
+        MetricsManager.getInstance().addSendSuccess(1);
+
         // Statistics tube performance
         totalKafkaSuccSendCnt.incrementAndGet();
         totalKafkaSuccSendSize.addAndGet(es.getEvent().getBody().length);
@@ -494,6 +497,8 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
                 } else {
                     logger.warn("Send message failed, error message: {}, 
resendQueue size: {}, event:{}",
                             e.getMessage(), resendQueue.size(), 
es.getEvent().hashCode());
+
+                    MetricsManager.getInstance().addSendFailed(1);
                 }
 
                 es.incRetryCnt();
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index a081c32cb8..c99cb9ce16 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.audit.sink;
 
 import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.metric.MetricsManager;
 import org.apache.inlong.audit.sink.pulsar.CreatePulsarClientCallBack;
 import org.apache.inlong.audit.sink.pulsar.PulsarClientService;
 import org.apache.inlong.audit.sink.pulsar.SendMessageCallBack;
@@ -319,6 +320,7 @@ public class PulsarSink extends AbstractSink
 
     @Override
     public void handleMessageSendSuccess(Object result, EventStat eventStat) {
+        MetricsManager.getInstance().addSendSuccess(1);
         /*
          * Statistics pulsar performance
          */
@@ -346,6 +348,7 @@ public class PulsarSink extends AbstractSink
 
     @Override
     public void handleMessageSendException(EventStat eventStat, Object e) {
+        MetricsManager.getInstance().addSendFailed(1);
         if (e instanceof TooLongFrameException) {
             PulsarSink.this.overflow = true;
         } else if (e instanceof ProducerQueueIsFullError) {
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
index c6cefcb088..f2eab52615 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.audit.sink;
 import org.apache.inlong.audit.base.HighPriorityThreadFactory;
 import org.apache.inlong.audit.consts.ConfigConstants;
 import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
 import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -313,6 +314,8 @@ public class TubeSink extends AbstractSink implements 
Configurable {
      * Send message of success.
      */
     public void handleMessageSendSuccess(EventStat es) {
+        MetricsManager.getInstance().addSendSuccess(1);
+
         // Statistics tube performance
         totalTubeSuccSendCnt.incrementAndGet();
         totalTubeSuccSendSize.addAndGet(es.getEvent().getBody().length);
@@ -630,6 +633,8 @@ public class TubeSink extends AbstractSink implements 
Configurable {
                 return;
             }
 
+            MetricsManager.getInstance().addSendFailed(1);
+
             // handle sent error
             if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
                 logger.warn("Send message failed, error message: {}, 
resendQueue size: {}, event:{}",
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 7768ef5e8c..7595586dd4 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.audit.source;
 
+import org.apache.inlong.audit.metric.MetricsManager;
 import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
 import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
 import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
@@ -142,13 +143,20 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
 
     private AuditReply handleRequest(AuditRequest auditRequest) throws 
Exception {
         if (auditRequest == null) {
+
+            MetricsManager.getInstance().addReceiveCountInvalid(1);
+
             throw new Exception("audit request cannot be null");
         }
+
         AuditReply reply = AuditReply.newBuilder()
                 .setRequestId(auditRequest.getRequestId())
                 .setRspCode(RSP_CODE.SUCCESS)
                 .build();
         List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+
+        MetricsManager.getInstance().addReceiveSuccess(bodyList.size(), 1, 
auditRequest.getSerializedSize());
+
         int errorMsgBody = 0;
         LOGGER.debug("Receive message count: {}", 
auditRequest.getMsgBodyCount());
         for (AuditMessageBody auditMessageBody : bodyList) {
@@ -156,6 +164,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             if (msgDays >= this.msgValidThresholdDays) {
                 LOGGER.debug("Discard the data as it is from {} days ago, only 
the data with a log timestamp"
                         + " less than {} days is valid", msgDays, 
this.msgValidThresholdDays);
+
+                MetricsManager.getInstance().addReceiveCountExpired(1);
+
                 continue;
             }
             AuditData auditData = new AuditData();
@@ -194,6 +205,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
 
         if (errorMsgBody != 0) {
+
+            MetricsManager.getInstance().addReceiveCountInvalid(errorMsgBody);
+
             reply = reply.toBuilder()
                     .setMessage("writing data error, discard it, error body 
count=" + errorMsgBody)
                     .setRspCode(RSP_CODE.FAILED)
diff --git a/inlong-audit/conf/application.properties 
b/inlong-audit/conf/application.properties
index 17fd3461f6..063b01aa8f 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -50,4 +50,10 @@ audit.kafka.group.id=audit-consumer-group
 audit.store.jdbc.driver=com.mysql.cj.jdbc.Driver
 
audit.store.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
 audit.store.jdbc.username=root
-audit.store.jdbc.password=inlong
\ No newline at end of file
+audit.store.jdbc.password=inlong
+
+############################
+# metric config
+# org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the 
default monitoring
+###########################
+audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
\ No newline at end of file

Reply via email to