This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 615f59ea4 [INLONG-5459][Sort] Add audit for MySQL extract node (#5539)
615f59ea4 is described below

commit 615f59ea49c7931cad30020fe6170d7543e3d11b
Author: Xin Gong <genzhedangd...@gmail.com>
AuthorDate: Mon Aug 15 13:11:03 2022 +0800

    [INLONG-5459][Sort] Add audit for MySQL extract node (#5539)
---
 .../org/apache/inlong/sort/base/Constants.java     |  1 -
 .../inlong/sort/base/metric/SourceMetricData.java  | 31 ++++++++
 .../sort/base/util/ValidateMetricOptionUtils.java  | 39 ++++++++++
 inlong-sort/sort-connectors/mysql-cdc/pom.xml      | 11 +++
 .../sort/cdc/debezium/DebeziumSourceFunction.java  | 25 ++++--
 .../apache/inlong/sort/cdc/mysql/MySqlSource.java  |  8 +-
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  | 27 ++++---
 .../sort/cdc/mysql/source/MySqlSourceBuilder.java  |  5 ++
 .../cdc/mysql/source/config/MySqlSourceConfig.java |  9 ++-
 .../source/config/MySqlSourceConfigFactory.java    | 13 ++--
 .../mysql/source/config/MySqlSourceOptions.java    |  6 --
 .../source/metrics/MySqlSourceReaderMetrics.java   | 91 ++++++++++++++++++----
 .../mysql/source/reader/MySqlRecordEmitter.java    |  9 +--
 .../mysql/table/MySqlTableInlongSourceFactory.java | 12 ++-
 .../sort/cdc/mysql/table/MySqlTableSource.java     | 22 ++++--
 15 files changed, 251 insertions(+), 58 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index b529bf2f0..19b1a90f4 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -75,7 +75,6 @@ public final class Constants {
     // sort send successfully
     public static final Integer AUDIT_SORT_OUTPUT = 8;
 
-
     public static final ConfigOption<String> INLONG_METRIC =
         ConfigOptions.key("inlong.metric")
             .stringType()
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e548784dc..bd82cc041 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,6 +22,8 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
 
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
@@ -41,12 +43,19 @@ public class SourceMetricData implements MetricData {
     private Counter numBytesIn;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
+    private final AuditImp auditImp;
 
     public SourceMetricData(String groupId, String streamId, String nodeId, 
MetricGroup metricGroup) {
+        this(groupId, streamId, nodeId, metricGroup, null);
+    }
+
+    public SourceMetricData(String groupId, String streamId, String nodeId, 
MetricGroup metricGroup,
+            AuditImp auditImp) {
         this.groupId = groupId;
         this.streamId = streamId;
         this.nodeId = nodeId;
         this.metricGroup = metricGroup;
+        this.auditImp = auditImp;
     }
 
     /**
@@ -128,4 +137,26 @@ public class SourceMetricData implements MetricData {
     public String getNodeId() {
         return nodeId;
     }
+
+    public void outputMetrics(long rowCountSize, long rowDataSize) {
+        outputMetricForFlink(rowCountSize, rowDataSize);
+        outputMetricForAudit(rowCountSize, rowDataSize);
+    }
+
+    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
+        if (auditImp != null) {
+            auditImp.add(
+                    Constants.AUDIT_SORT_INPUT,
+                    getGroupId(),
+                    getStreamId(),
+                    System.currentTimeMillis(),
+                    rowCountSize,
+                    rowDataSize);
+        }
+    }
+
+    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
+        this.numBytesIn.inc(rowDataSize);
+        this.numRecordsIn.inc(rowCountSize);
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
new file mode 100644
index 000000000..bd58e31ae
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
@@ -0,0 +1,39 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * validate option tool
+ */
+public class ValidateMetricOptionUtils {
+
+    /**
+     * validate inlong metric when set inlong audit
+     * @param inlongMetric inlong.metric option value
+     * @param inlongAudit inlong.audit option value
+     */
+    public static void validateInlongMetricIfSetInlongAudit(String 
inlongMetric, String inlongAudit) {
+        if (inlongAudit != null && inlongMetric == null) {
+            throw new ValidationException("inlong metric is necessary when set 
inlong audit");
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml 
b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
index c29f2dcb9..a4e01dae3 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
@@ -47,6 +47,11 @@
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-format-json</artifactId>
@@ -112,6 +117,12 @@
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        
org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     <shadedPattern>
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 882775d47..d5b3c676b 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
@@ -66,7 +67,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,6 +78,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -151,7 +155,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
     /**
      * The specific binlog offset to read from when the first startup.
      */
-    private final @Nullable DebeziumOffset specificOffset;
+    private final @Nullable
+    DebeziumOffset specificOffset;
 
     /**
      * Data for pending but uncommitted offsets.
@@ -221,6 +226,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private String inlongMetric;
 
+    private String inlongAudit;
+
     private SourceMetricData sourceMetricData;
 
     // 
---------------------------------------------------------------------------------------
@@ -230,12 +237,14 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
             Validator validator,
-            String inlongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -413,7 +422,12 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, 
metricGroup);
+            AuditImp auditImp = null;
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new 
HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                auditImp = AuditImp.getInstance();
+            }
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, 
metricGroup, auditImp);
             sourceMetricData.registerMetricsForNumRecordsIn();
             sourceMetricData.registerMetricsForNumBytesIn();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
@@ -458,9 +472,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
                                 if (sourceMetricData != null) {
-                                    sourceMetricData.getNumRecordsIn().inc(1L);
-                                    sourceMetricData.getNumBytesIn()
-                                            
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    sourceMetricData.outputMetrics(1L,
+                                            
record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index 4caf6be21..0c1a18c13 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -71,6 +71,7 @@ public class MySqlSource {
         private StartupOptions startupOptions = StartupOptions.initial();
         private DebeziumDeserializationSchema<T> deserializer;
         private String inlongMetric;
+        private String inlongAudit;
 
         public Builder<T> hostname(String hostname) {
             this.hostname = hostname;
@@ -173,6 +174,11 @@ public class MySqlSource {
             return this;
         }
 
+        public Builder<T> inlongAudit(String inlongAudit) {
+            this.inlongAudit = inlongAudit;
+            return this;
+        }
+
         /**
          * builder
          */
@@ -267,7 +273,7 @@ public class MySqlSource {
             }
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, specificOffset, new 
MySqlValidator(props), inlongMetric);
+                    deserializer, props, specificOffset, new 
MySqlValidator(props), inlongMetric, inlongAudit);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 3724d2a8e..df48edec2 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.mysql.MySqlValidator;
 import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
@@ -60,9 +62,12 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
 
@@ -144,16 +149,20 @@ public class MySqlSource<T>
         MySqlSourceConfig sourceConfig =
                 configFactory.createConfig(readerContext.getIndexOfSubtask());
         String inlongMetric = sourceConfig.getInlongMetric();
+        String inlongAudit = sourceConfig.getInlongAudit();
         if (StringUtils.isNotEmpty(inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split("&");
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sourceReaderMetrics.registerMetricsForNumBytesIn(groupId, 
streamId, nodeId, "numBytesIn");
-            sourceReaderMetrics.registerMetricsForNumRecordsIn(groupId, 
streamId, nodeId, "numRecordsIn");
-            sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(groupId, 
streamId, nodeId, "numBytesInPerSecond");
-            
sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(groupId, streamId, 
nodeId,
-                    "numRecordsInPerSecond");
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]);
+            sourceReaderMetrics.setInlongGroupId(inlongMetricArray[1]);
+            sourceReaderMetrics.setInlongGroupId(inlongMetricArray[2]);
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new 
HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                sourceReaderMetrics.setAuditImp(AuditImp.getInstance());
+            }
+            
sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN);
+            
sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN);
+            
sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND);
+            
sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND);
         }
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>();
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 015dc111e..41f1c31e9 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -257,6 +257,11 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    public MySqlSourceBuilder<T> inlongAudit(String inlongAudit) {
+        this.configFactory.inlongAudit(inlongAudit);
+        return this;
+    }
+
     /**
      * Build the {@link MySqlSource}.
      *
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 2104e6f0b..d21d04836 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -68,6 +68,7 @@ public class MySqlSourceConfig implements Serializable {
     private final MySqlConnectorConfig dbzMySqlConfig;
 
     private final String inlongMetric;
+    private final String inlongAudit;
 
     MySqlSourceConfig(
             String hostname,
@@ -91,7 +92,8 @@ public class MySqlSourceConfig implements Serializable {
             boolean scanNewlyAddedTableEnabled,
             Properties dbzProperties,
             Properties jdbcProperties,
-            String inlongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this.hostname = checkNotNull(hostname);
         this.port = port;
         this.username = checkNotNull(username);
@@ -116,6 +118,7 @@ public class MySqlSourceConfig implements Serializable {
         this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
         this.jdbcProperties = jdbcProperties;
         this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     public String getHostname() {
@@ -218,4 +221,8 @@ public class MySqlSourceConfig implements Serializable {
     public String getInlongMetric() {
         return inlongMetric;
     }
+
+    public String getInlongAudit() {
+        return inlongAudit;
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 2bf6507ac..9eff7b8ee 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -75,12 +75,18 @@ public class MySqlSourceConfigFactory implements 
Serializable {
     private Properties dbzProperties;
 
     private String inlongMetric;
+    private String inlongAudit;
 
     public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
         this.inlongMetric = inlongMetric;
         return this;
     }
 
+    public MySqlSourceConfigFactory inlongAudit(String inlongAudit) {
+        this.inlongAudit = inlongAudit;
+        return this;
+    }
+
     public MySqlSourceConfigFactory hostname(String hostname) {
         this.hostname = hostname;
         return this;
@@ -341,10 +347,6 @@ public class MySqlSourceConfigFactory implements 
Serializable {
             jdbcProperties = new Properties();
         }
 
-        if (inlongMetric == null) {
-            inlongMetric = "";
-        }
-
         return new MySqlSourceConfig(
                 hostname,
                 port,
@@ -367,6 +369,7 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 scanNewlyAddedTableEnabled,
                 props,
                 jdbcProperties,
-                inlongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index ca32dd4a9..8a211cf19 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -30,12 +30,6 @@ import java.time.Duration;
  */
 public class MySqlSourceOptions {
 
-    public static final ConfigOption<String> INLONG_METRIC =
-            ConfigOptions.key("inlong.metric")
-                    .stringType()
-                    .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' 
+ NODE ID");
-
     public static final ConfigOption<String> HOSTNAME =
             ConfigOptions.key("hostname")
                     .stringType()
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index fc375b1d0..b301ed572 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
 
 /**
@@ -59,6 +61,10 @@ public class MySqlSourceReaderMetrics {
     private static String STREAM_ID = "streamId";
     private static String GROUP_ID = "groupId";
     private static String NODE_ID = "nodeId";
+    private String inlongGroupId;
+    private String inlongSteamId;
+    private String nodeId;
+    private AuditImp auditImp;
 
     public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -70,29 +76,30 @@ public class MySqlSourceReaderMetrics {
         metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
     }
 
-    public void registerMetricsForNumRecordsIn(String groupId, String 
streamId, String nodeId, String metricName) {
+    public void registerMetricsForNumRecordsIn(String metricName) {
         numRecordsIn =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, 
streamId).addGroup(NODE_ID, nodeId)
+                metricGroup.addGroup(GROUP_ID, 
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+                        .addGroup(NODE_ID, this.nodeId)
                         .counter(metricName);
     }
 
-    public void registerMetricsForNumBytesIn(String groupId, String streamId, 
String nodeId, String metricName) {
+    public void registerMetricsForNumBytesIn(String metricName) {
         numBytesIn =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, 
streamId).addGroup(NODE_ID, nodeId)
+                metricGroup.addGroup(GROUP_ID, 
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+                        .addGroup(NODE_ID, this.nodeId)
                         .counter(metricName);
     }
 
-    public void registerMetricsForNumRecordsInPerSecond(String groupId, String 
streamId, String nodeId,
-            String metricName) {
-        numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, 
groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
-                        nodeId)
-                .meter(metricName, new MeterView(this.numRecordsIn, 
TIME_SPAN_IN_SECONDS));
+    public void registerMetricsForNumRecordsInPerSecond(String metricName) {
+        numRecordsInPerSecond =
+                metricGroup.addGroup(GROUP_ID, 
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+                        .addGroup(NODE_ID, nodeId)
+                        .meter(metricName, new MeterView(this.numRecordsIn, 
TIME_SPAN_IN_SECONDS));
     }
 
-    public void registerMetricsForNumBytesInPerSecond(String groupId, String 
streamId, String nodeId,
-            String metricName) {
-        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, 
groupId).addGroup(STREAM_ID, streamId)
-                .addGroup(NODE_ID, nodeId)
+    public void registerMetricsForNumBytesInPerSecond(String metricName) {
+        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, 
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+                .addGroup(NODE_ID, this.nodeId)
                 .meter(metricName, new MeterView(this.numBytesIn, 
TIME_SPAN_IN_SECONDS));
     }
 
@@ -139,4 +146,62 @@ public class MySqlSourceReaderMetrics {
     public Meter getNumBytesInPerSecond() {
         return numBytesInPerSecond;
     }
+
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    public String getInlongSteamId() {
+        return inlongSteamId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    public void setInlongSteamId(String inlongSteamId) {
+        this.inlongSteamId = inlongSteamId;
+    }
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public AuditImp getAuditImp() {
+        return auditImp;
+    }
+
+    public void setAuditImp(AuditImp auditImp) {
+        this.auditImp = auditImp;
+    }
+
+    public void outputMetrics(long rowCountSize, long rowDataSize) {
+        outputMetricForFlink(rowCountSize, rowDataSize);
+        outputMetricForAudit(rowCountSize, rowDataSize);
+    }
+
+    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
+        if (this.auditImp != null) {
+            this.auditImp.add(
+                    Constants.AUDIT_SORT_INPUT,
+                    getInlongGroupId(),
+                    getInlongSteamId(),
+                    System.currentTimeMillis(),
+                    rowCountSize,
+                    rowDataSize);
+        }
+    }
+
+    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
+        if (this.numBytesIn != null) {
+            numBytesIn.inc(rowDataSize);
+        }
+        if (this.numRecordsIn != null) {
+            this.numRecordsIn.inc(rowCountSize);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 3cfceb567..d2cc328f9 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,13 +145,8 @@ public final class MySqlRecordEmitter<T>
                     new Collector<T>() {
                         @Override
                         public void collect(final T t) {
-                            if (sourceReaderMetrics.getNumRecordsIn() != null) 
{
-                                sourceReaderMetrics.getNumRecordsIn().inc(1L);
-                            }
-                            if (sourceReaderMetrics.getNumBytesIn() != null) {
-                                sourceReaderMetrics.getNumBytesIn()
-                                        
.inc(t.toString().getBytes(StandardCharsets.UTF_8).length);
-                            }
+                            sourceReaderMetrics.outputMetrics(1L,
+                                    
t.toString().getBytes(StandardCharsets.UTF_8).length);
                             output.collect(t);
                         }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 3ffd679f7..c3c475e92 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
@@ -37,6 +38,8 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.APPEND_MODE;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -46,7 +49,6 @@ import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
-import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INLONG_METRIC;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -121,6 +123,8 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
 
         final ReadableConfig config = helper.getOptions();
         final String inlongMetric = config.get(INLONG_METRIC);
+        final String inlongAudit = config.get(INLONG_AUDIT);
+        
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, 
inlongAudit);
         final String hostname = config.get(HOSTNAME);
         final String username = config.get(USERNAME);
         final String password = config.get(PASSWORD);
@@ -186,7 +190,8 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
                 heartbeatInterval,
                 migrateAll,
-                inlongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     @Override
@@ -229,6 +234,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(HEARTBEAT_INTERVAL);
         options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
 
@@ -284,7 +290,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
      * Checks the given regular expression's syntax is valid.
      *
      * @param optionName the option name of the regex
-     * @param regex      The regular expression to be checked
+     * @param regex The regular expression to be checked
      * @throws ValidationException If the expression's syntax is invalid
      */
     private void validateRegex(String optionName, String regex) {
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 7038a0e3a..621f122f6 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -82,6 +82,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final Duration heartbeatInterval;
     private final boolean migrateAll;
     private final String inlongMetric;
+    private final String inlongAudit;
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -123,7 +124,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             StartupOptions startupOptions,
             Duration heartbeatInterval,
             boolean migrateAll,
-            String inlongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this(
                 physicalSchema,
                 port,
@@ -150,7 +152,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 new Properties(),
                 heartbeatInterval,
                 migrateAll,
-                inlongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     /**
@@ -182,7 +185,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             Properties jdbcProperties,
             Duration heartbeatInterval,
             boolean migrateAll,
-            String inlongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -212,6 +216,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.heartbeatInterval = heartbeatInterval;
         this.migrateAll = migrateAll;
         this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -271,6 +276,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .jdbcProperties(jdbcProperties)
                             .heartbeatInterval(heartbeatInterval)
                             .inlongMetric(inlongMetric)
+                            .inlongAudit(inlongAudit)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -286,6 +292,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .debeziumProperties(dbzProperties)
                             .startupOptions(startupOptions)
                             .inlongMetric(inlongMetric)
+                            .inlongAudit(inlongAudit)
                             .deserializer(deserializer);
             Optional.ofNullable(serverId)
                     .ifPresent(serverId -> 
builder.serverId(Integer.parseInt(serverId)));
@@ -358,7 +365,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         jdbcProperties,
                         heartbeatInterval,
                         migrateAll,
-                        inlongMetric);
+                        inlongMetric,
+                        inlongAudit);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -397,7 +405,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(metadataKeys, that.metadataKeys)
                 && Objects.equals(jdbcProperties, that.jdbcProperties)
-                && Objects.equals(inlongMetric, that.inlongMetric);
+                && Objects.equals(inlongMetric, that.inlongMetric)
+                && Objects.equals(inlongAudit, that.inlongAudit);
     }
 
     @Override
@@ -427,7 +436,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 metadataKeys,
                 scanNewlyAddedTableEnabled,
                 jdbcProperties,
-                inlongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     @Override

Reply via email to