This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 35f153b36e [INLONG-10450][Sort] Add a new version SinkMetric class to 
support report audit information exactly once (#10451)
35f153b36e is described below

commit 35f153b36e9a8720e400aebab1bb9b3f961bc727
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed Jun 19 18:56:22 2024 +0800

    [INLONG-10450][Sort] Add a new version SinkMetric class to support report 
audit information exactly once (#10451)
---
 .../inlong/sort/base/metric/SinkExactlyMetric.java | 333 +++++++++++++++++++++
 1 file changed, 333 insertions(+)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
new file mode 100644
index 0000000000..5c916a1b56
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.inlong.audit.AuditReporterImpl;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
+/**
+ * A collection class for handling metrics
+ * <p>
+ * Copy from SinkMetricData
+ */
+public class SinkExactlyMetric implements MetricData, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final MetricGroup metricGroup;
+    private final Map<String, String> labels;
+    private final RegisteredMetric registeredMetric;
+    private AuditReporterImpl auditReporter;
+    private Counter numRecordsOut;
+    private Counter numBytesOut;
+    private Counter numRecordsOutForMeter;
+    private Counter numBytesOutForMeter;
+    private Counter dirtyRecordsOut;
+    private Counter dirtyBytesOut;
+    private Meter numRecordsOutPerSecond;
+    private Meter numBytesOutPerSecond;
+    private List<Integer> auditKeys;
+    private Long currentCheckpointId = 0L;
+    private Long lastCheckpointId = 0L;
+
+    public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+        this.labels = option.getLabels();
+        this.registeredMetric = option.getRegisteredMetric();
+
+        ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
+        ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
+        ThreadSafeCounter dirtyRecordsOutCounter = new ThreadSafeCounter();
+        ThreadSafeCounter dirtyBytesOutCounter = new ThreadSafeCounter();
+        switch (registeredMetric) {
+            case DIRTY:
+                registerMetricsForDirtyBytesOut(new ThreadSafeCounter());
+                registerMetricsForDirtyRecordsOut(new ThreadSafeCounter());
+                break;
+            case NORMAL:
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                registerMetricsForNumBytesOut(bytesOutCounter);
+                registerMetricsForNumRecordsOut(recordsOutCounter);
+                registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOutForMeter(new 
ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+                break;
+            default:
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                dirtyRecordsOutCounter.inc(option.getInitDirtyRecords());
+                dirtyBytesOutCounter.inc(option.getInitDirtyBytes());
+                registerMetricsForNumBytesOut(bytesOutCounter);
+                registerMetricsForNumRecordsOut(recordsOutCounter);
+                registerMetricsForDirtyRecordsOut(dirtyRecordsOutCounter);
+                registerMetricsForDirtyBytesOut(dirtyBytesOutCounter);
+                registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOutForMeter(new 
ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+                break;
+
+        }
+
+        if (option.getIpPorts().isPresent()) {
+            this.auditReporter = new AuditReporterImpl();
+            auditReporter.setAutoFlush(false);
+            auditReporter.setAuditProxy(option.getIpPortSet());
+            this.auditKeys = option.getInlongAuditKeys();
+        }
+    }
+
+    /**
+     * Users can custom counter that extends from {@link SimpleCounter}
+     * groupId and streamId and nodeId are label values,
+     * user can use it to filter metric data when using metric reporter 
Prometheus
+     * The following method is similar
+     */
+    public void registerMetricsForNumRecordsOutForMeter() {
+        registerMetricsForNumRecordsOutForMeter(new SimpleCounter());
+    }
+
+    /**
+     * Users can custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label values,
+     * user can use it to filter metric data when using metric reporter 
Prometheus
+     * The following method is similar
+     */
+    public void registerMetricsForNumRecordsOutForMeter(Counter counter) {
+        numRecordsOutForMeter = registerCounter(NUM_RECORDS_OUT_FOR_METER, 
counter);
+    }
+
+    public void registerMetricsForNumBytesOutForMeter() {
+        registerMetricsForNumBytesOutForMeter(new SimpleCounter());
+
+    }
+
+    public void registerMetricsForNumBytesOutForMeter(Counter counter) {
+        numBytesOutForMeter = registerCounter(NUM_BYTES_OUT_FOR_METER, 
counter);
+    }
+
+    public void registerMetricsForNumRecordsOut() {
+        registerMetricsForNumRecordsOut(new SimpleCounter());
+    }
+
+    public void registerMetricsForNumRecordsOut(Counter counter) {
+        numRecordsOut = registerCounter(NUM_RECORDS_OUT, counter);
+    }
+
+    public void registerMetricsForNumBytesOut() {
+        registerMetricsForNumBytesOut(new SimpleCounter());
+
+    }
+
+    public void registerMetricsForNumBytesOut(Counter counter) {
+        numBytesOut = registerCounter(NUM_BYTES_OUT, counter);
+    }
+
+    public void registerMetricsForNumRecordsOutPerSecond() {
+        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, 
this.numRecordsOutForMeter);
+    }
+
+    public void registerMetricsForNumBytesOutPerSecond() {
+        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, 
this.numBytesOutForMeter);
+    }
+
+    public void registerMetricsForDirtyRecordsOut() {
+        registerMetricsForDirtyRecordsOut(new SimpleCounter());
+    }
+
+    public void registerMetricsForDirtyRecordsOut(Counter counter) {
+        dirtyRecordsOut = registerCounter(DIRTY_RECORDS_OUT, counter);
+    }
+
+    public void registerMetricsForDirtyBytesOut() {
+        registerMetricsForDirtyBytesOut(new SimpleCounter());
+    }
+
+    public void registerMetricsForDirtyBytesOut(Counter counter) {
+        dirtyBytesOut = registerCounter(DIRTY_BYTES_OUT, counter);
+    }
+
+    public Counter getNumRecordsOut() {
+        return numRecordsOut;
+    }
+
+    public Counter getNumBytesOut() {
+        return numBytesOut;
+    }
+
+    public Counter getDirtyRecordsOut() {
+        return dirtyRecordsOut;
+    }
+
+    public Counter getDirtyBytesOut() {
+        return dirtyBytesOut;
+    }
+
+    public Meter getNumRecordsOutPerSecond() {
+        return numRecordsOutPerSecond;
+    }
+
+    public Meter getNumBytesOutPerSecond() {
+        return numBytesOutPerSecond;
+    }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    @Override
+    public Map<String, String> getLabels() {
+        return labels;
+    }
+
+    public Counter getNumRecordsOutForMeter() {
+        return numRecordsOutForMeter;
+    }
+
+    public Counter getNumBytesOutForMeter() {
+        return numBytesOutForMeter;
+    }
+
+    public void invokeDirtyWithEstimate(Object o) {
+        invokeDirty(1, getDataSize(o));
+    }
+
+    public void invokeWithId(long rowCount, long rowSize, long dataTime) {
+        outputDefaultMetrics(rowCount, rowSize);
+        outputAuditMetricsWithId(rowCount, rowSize, dataTime);
+    }
+
+    private void outputAuditMetricsWithId(long rowCount, long rowSize, long 
dataTime) {
+        if (auditReporter != null) {
+            for (Integer key : auditKeys) {
+                auditReporter.add(
+                        currentCheckpointId,
+                        key,
+                        DEFAULT_AUDIT_TAG,
+                        getGroupId(),
+                        getStreamId(),
+                        dataTime,
+                        rowCount,
+                        rowSize,
+                        DEFAULT_AUDIT_VERSION);
+            }
+        }
+    }
+
+    private void outputDefaultMetrics(long rowCount, long rowSize) {
+        if (numRecordsOut != null) {
+            numRecordsOut.inc(rowCount);
+        }
+
+        if (numBytesOut != null) {
+            numBytesOut.inc(rowSize);
+        }
+
+        if (numRecordsOutForMeter != null) {
+            numRecordsOutForMeter.inc(rowCount);
+        }
+
+        if (numBytesOutForMeter != null) {
+            numBytesOutForMeter.inc(rowSize);
+        }
+    }
+
+    public void invokeDirty(long rowCount, long rowSize) {
+        if (dirtyRecordsOut != null) {
+            dirtyRecordsOut.inc(rowCount);
+        }
+
+        if (dirtyBytesOut != null) {
+            dirtyBytesOut.inc(rowSize);
+        }
+    }
+
+    public void flushAudit() {
+        if (auditReporter != null) {
+            auditReporter.flush(lastCheckpointId);
+        }
+    }
+    public void updateLastCheckpointId(Long checkpointId) {
+        lastCheckpointId = checkpointId;
+    }
+    public void updateCurrentCheckpointId(Long checkpointId) {
+        currentCheckpointId = checkpointId;
+    }
+
+    @Override
+    public String toString() {
+        switch (registeredMetric) {
+            case DIRTY:
+                return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", dirtyRecords=" + dirtyRecordsOut.getCount()
+                        + ", dirtyBytes=" + dirtyBytesOut.getCount()
+                        + '}';
+            case NORMAL:
+                return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", auditReporter=" + auditReporter
+                        + ", numRecordsOut=" + numRecordsOut.getCount()
+                        + ", numBytesOut=" + numBytesOut.getCount()
+                        + ", numRecordsOutForMeter=" + 
numRecordsOutForMeter.getCount()
+                        + ", numBytesOutForMeter=" + 
numBytesOutForMeter.getCount()
+                        + ", numRecordsOutPerSecond=" + 
numRecordsOutPerSecond.getRate()
+                        + ", numBytesOutPerSecond=" + 
numBytesOutPerSecond.getRate()
+                        + '}';
+            default:
+                return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", auditReporter=" + auditReporter
+                        + ", numRecordsOut=" + numRecordsOut.getCount()
+                        + ", numBytesOut=" + numBytesOut.getCount()
+                        + ", numRecordsOutForMeter=" + 
numRecordsOutForMeter.getCount()
+                        + ", numBytesOutForMeter=" + 
numBytesOutForMeter.getCount()
+                        + ", dirtyRecordsOut=" + dirtyRecordsOut.getCount()
+                        + ", dirtyBytesOut=" + dirtyBytesOut.getCount()
+                        + ", numRecordsOutPerSecond=" + 
numRecordsOutPerSecond.getRate()
+                        + ", numBytesOutPerSecond=" + 
numBytesOutPerSecond.getRate()
+                        + '}';
+        }
+    }
+}
\ No newline at end of file

Reply via email to