This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 615f59ea4 [INLONG-5459][Sort] Add audit for MySQL extract node (#5539) 615f59ea4 is described below commit 615f59ea49c7931cad30020fe6170d7543e3d11b Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Mon Aug 15 13:11:03 2022 +0800 [INLONG-5459][Sort] Add audit for MySQL extract node (#5539) --- .../org/apache/inlong/sort/base/Constants.java | 1 - .../inlong/sort/base/metric/SourceMetricData.java | 31 ++++++++ .../sort/base/util/ValidateMetricOptionUtils.java | 39 ++++++++++ inlong-sort/sort-connectors/mysql-cdc/pom.xml | 11 +++ .../sort/cdc/debezium/DebeziumSourceFunction.java | 25 ++++-- .../apache/inlong/sort/cdc/mysql/MySqlSource.java | 8 +- .../inlong/sort/cdc/mysql/source/MySqlSource.java | 27 ++++--- .../sort/cdc/mysql/source/MySqlSourceBuilder.java | 5 ++ .../cdc/mysql/source/config/MySqlSourceConfig.java | 9 ++- .../source/config/MySqlSourceConfigFactory.java | 13 ++-- .../mysql/source/config/MySqlSourceOptions.java | 6 -- .../source/metrics/MySqlSourceReaderMetrics.java | 91 ++++++++++++++++++---- .../mysql/source/reader/MySqlRecordEmitter.java | 9 +-- .../mysql/table/MySqlTableInlongSourceFactory.java | 12 ++- .../sort/cdc/mysql/table/MySqlTableSource.java | 22 ++++-- 15 files changed, 251 insertions(+), 58 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index b529bf2f0..19b1a90f4 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -75,7 +75,6 @@ public final class Constants { // sort send successfully public static final Integer AUDIT_SORT_OUTPUT = 8; - public static final ConfigOption<String> INLONG_METRIC = ConfigOptions.key("inlong.metric") .stringType() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index e548784dc..bd82cc041 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -22,6 +22,8 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.Constants; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; @@ -41,12 +43,19 @@ public class SourceMetricData implements MetricData { private Counter numBytesIn; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; + private final AuditImp auditImp; public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) { + this(groupId, streamId, nodeId, metricGroup, null); + } + + public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup, + AuditImp auditImp) { this.groupId = groupId; this.streamId = streamId; this.nodeId = nodeId; this.metricGroup = metricGroup; + this.auditImp = auditImp; } /** @@ -128,4 +137,26 @@ public class SourceMetricData implements MetricData { public String getNodeId() { return nodeId; } + + public void outputMetrics(long rowCountSize, long rowDataSize) { + outputMetricForFlink(rowCountSize, rowDataSize); + outputMetricForAudit(rowCountSize, rowDataSize); + } + + public void outputMetricForAudit(long rowCountSize, long rowDataSize) { + if (auditImp != null) { + auditImp.add( + Constants.AUDIT_SORT_INPUT, + getGroupId(), + getStreamId(), + System.currentTimeMillis(), + rowCountSize, + rowDataSize); + } + } + + public void outputMetricForFlink(long rowCountSize, long rowDataSize) { + this.numBytesIn.inc(rowDataSize); + this.numRecordsIn.inc(rowCountSize); + } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java new file mode 100644 index 000000000..bd58e31ae --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import org.apache.flink.table.api.ValidationException; + +/** + * validate option tool + */ +public class ValidateMetricOptionUtils { + + /** + * validate inlong metric when set inlong audit + * @param inlongMetric inlong.metric option value + * @param inlongAudit inlong.audit option value + */ + public static void validateInlongMetricIfSetInlongAudit(String inlongMetric, String inlongAudit) { + if (inlongAudit != null && inlongMetric == null) { + throw new ValidationException("inlong metric is necessary when set inlong audit"); + } + } + +} diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-connectors/mysql-cdc/pom.xml index c29f2dcb9..a4e01dae3 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-connectors/mysql-cdc/pom.xml @@ -47,6 +47,11 @@ <artifactId>sort-connector-base</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>audit-sdk</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>sort-format-json</artifactId> @@ -112,6 +117,12 @@ </filter> </filters> <relocations> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern> + org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base + </shadedPattern> + </relocation> <relocation> <pattern>org.apache.kafka</pattern> <shadedPattern> diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index 882775d47..d5b3c676b 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer; @@ -66,7 +67,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -75,6 +78,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -151,7 +155,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> /** * The specific binlog offset to read from when the first startup. */ - private final @Nullable DebeziumOffset specificOffset; + private final @Nullable + DebeziumOffset specificOffset; /** * Data for pending but uncommitted offsets. @@ -221,6 +226,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongMetric; + private String inlongAudit; + private SourceMetricData sourceMetricData; // --------------------------------------------------------------------------------------- @@ -230,12 +237,14 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> Properties properties, @Nullable DebeziumOffset specificOffset, Validator validator, - String inlongMetric) { + String inlongMetric, + String inlongAudit) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; } @Override @@ -413,7 +422,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; - sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup); + AuditImp auditImp = null; + if (inlongAudit != null) { + AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); + auditImp = AuditImp.getInstance(); + } + sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); sourceMetricData.registerMetricsForNumRecordsIn(); sourceMetricData.registerMetricsForNumBytesIn(); sourceMetricData.registerMetricsForNumBytesInPerSecond(); @@ -458,9 +472,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { if (sourceMetricData != null) { - sourceMetricData.getNumRecordsIn().inc(1L); - sourceMetricData.getNumBytesIn() - .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length); + sourceMetricData.outputMetrics(1L, + record.value().toString().getBytes(StandardCharsets.UTF_8).length); } deserializer.deserialize(record, out); } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java index 4caf6be21..0c1a18c13 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java @@ -71,6 +71,7 @@ public class MySqlSource { private StartupOptions startupOptions = StartupOptions.initial(); private DebeziumDeserializationSchema<T> deserializer; private String inlongMetric; + private String inlongAudit; public Builder<T> hostname(String hostname) { this.hostname = hostname; @@ -173,6 +174,11 @@ public class MySqlSource { return this; } + public Builder<T> inlongAudit(String inlongAudit) { + this.inlongAudit = inlongAudit; + return this; + } + /** * builder */ @@ -267,7 +273,7 @@ public class MySqlSource { } return new DebeziumSourceFunction<>( - deserializer, props, specificOffset, new MySqlValidator(props), inlongMetric); + deserializer, props, specificOffset, new MySqlValidator(props), inlongMetric, inlongAudit); } } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java index 3724d2a8e..df48edec2 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java @@ -36,6 +36,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema; import org.apache.inlong.sort.cdc.mysql.MySqlValidator; import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils; @@ -60,9 +62,12 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode; import org.apache.kafka.connect.source.SourceRecord; import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.function.Supplier; +import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables; import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection; @@ -144,16 +149,20 @@ public class MySqlSource<T> MySqlSourceConfig sourceConfig = configFactory.createConfig(readerContext.getIndexOfSubtask()); String inlongMetric = sourceConfig.getInlongMetric(); + String inlongAudit = sourceConfig.getInlongAudit(); if (StringUtils.isNotEmpty(inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split("&"); - String groupId = inlongMetricArray[0]; - String streamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - sourceReaderMetrics.registerMetricsForNumBytesIn(groupId, streamId, nodeId, "numBytesIn"); - sourceReaderMetrics.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn"); - sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId, "numBytesInPerSecond"); - sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId, - "numRecordsInPerSecond"); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]); + sourceReaderMetrics.setInlongGroupId(inlongMetricArray[1]); + sourceReaderMetrics.setInlongGroupId(inlongMetricArray[2]); + if (inlongAudit != null) { + AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); + sourceReaderMetrics.setAuditImp(AuditImp.getInstance()); + } + sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN); + sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN); + sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND); + sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND); } FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue = new FutureCompletingBlockingQueue<>(); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java index 015dc111e..41f1c31e9 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java @@ -257,6 +257,11 @@ public class MySqlSourceBuilder<T> { return this; } + public MySqlSourceBuilder<T> inlongAudit(String inlongAudit) { + this.configFactory.inlongAudit(inlongAudit); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java index 2104e6f0b..d21d04836 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java @@ -68,6 +68,7 @@ public class MySqlSourceConfig implements Serializable { private final MySqlConnectorConfig dbzMySqlConfig; private final String inlongMetric; + private final String inlongAudit; MySqlSourceConfig( String hostname, @@ -91,7 +92,8 @@ public class MySqlSourceConfig implements Serializable { boolean scanNewlyAddedTableEnabled, Properties dbzProperties, Properties jdbcProperties, - String inlongMetric) { + String inlongMetric, + String inlongAudit) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -116,6 +118,7 @@ public class MySqlSourceConfig implements Serializable { this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration); this.jdbcProperties = jdbcProperties; this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; } public String getHostname() { @@ -218,4 +221,8 @@ public class MySqlSourceConfig implements Serializable { public String getInlongMetric() { return inlongMetric; } + + public String getInlongAudit() { + return inlongAudit; + } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java index 2bf6507ac..9eff7b8ee 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java @@ -75,12 +75,18 @@ public class MySqlSourceConfigFactory implements Serializable { private Properties dbzProperties; private String inlongMetric; + private String inlongAudit; public MySqlSourceConfigFactory inlongMetric(String inlongMetric) { this.inlongMetric = inlongMetric; return this; } + public MySqlSourceConfigFactory inlongAudit(String inlongAudit) { + this.inlongAudit = inlongAudit; + return this; + } + public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; return this; @@ -341,10 +347,6 @@ public class MySqlSourceConfigFactory implements Serializable { jdbcProperties = new Properties(); } - if (inlongMetric == null) { - inlongMetric = ""; - } - return new MySqlSourceConfig( hostname, port, @@ -367,6 +369,7 @@ public class MySqlSourceConfigFactory implements Serializable { scanNewlyAddedTableEnabled, props, jdbcProperties, - inlongMetric); + inlongMetric, + inlongAudit); } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java index ca32dd4a9..8a211cf19 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java @@ -30,12 +30,6 @@ import java.time.Duration; */ public class MySqlSourceOptions { - public static final ConfigOption<String> INLONG_METRIC = - ConfigOptions.key("inlong.metric") - .stringType() - .defaultValue("") - .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); - public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname") .stringType() diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java index fc375b1d0..b301ed572 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -23,6 +23,8 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; +import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader; /** @@ -59,6 +61,10 @@ public class MySqlSourceReaderMetrics { private static String STREAM_ID = "streamId"; private static String GROUP_ID = "groupId"; private static String NODE_ID = "nodeId"; + private String inlongGroupId; + private String inlongSteamId; + private String nodeId; + private AuditImp auditImp; public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -70,29 +76,30 @@ public class MySqlSourceReaderMetrics { metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime); } - public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) { + public void registerMetricsForNumRecordsIn(String metricName) { numRecordsIn = - metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId) + metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) + .addGroup(NODE_ID, this.nodeId) .counter(metricName); } - public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) { + public void registerMetricsForNumBytesIn(String metricName) { numBytesIn = - metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId) + metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) + .addGroup(NODE_ID, this.nodeId) .counter(metricName); } - public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId, - String metricName) { - numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, - nodeId) - .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS)); + public void registerMetricsForNumRecordsInPerSecond(String metricName) { + numRecordsInPerSecond = + metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) + .addGroup(NODE_ID, nodeId) + .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS)); } - public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId, - String metricName) { - numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId) - .addGroup(NODE_ID, nodeId) + public void registerMetricsForNumBytesInPerSecond(String metricName) { + numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId) + .addGroup(NODE_ID, this.nodeId) .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS)); } @@ -139,4 +146,62 @@ public class MySqlSourceReaderMetrics { public Meter getNumBytesInPerSecond() { return numBytesInPerSecond; } + + public String getInlongGroupId() { + return inlongGroupId; + } + + public String getInlongSteamId() { + return inlongSteamId; + } + + public String getNodeId() { + return nodeId; + } + + public void setInlongGroupId(String inlongGroupId) { + this.inlongGroupId = inlongGroupId; + } + + public void setInlongSteamId(String inlongSteamId) { + this.inlongSteamId = inlongSteamId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public AuditImp getAuditImp() { + return auditImp; + } + + public void setAuditImp(AuditImp auditImp) { + this.auditImp = auditImp; + } + + public void outputMetrics(long rowCountSize, long rowDataSize) { + outputMetricForFlink(rowCountSize, rowDataSize); + outputMetricForAudit(rowCountSize, rowDataSize); + } + + public void outputMetricForAudit(long rowCountSize, long rowDataSize) { + if (this.auditImp != null) { + this.auditImp.add( + Constants.AUDIT_SORT_INPUT, + getInlongGroupId(), + getInlongSteamId(), + System.currentTimeMillis(), + rowCountSize, + rowDataSize); + } + } + + public void outputMetricForFlink(long rowCountSize, long rowDataSize) { + if (this.numBytesIn != null) { + numBytesIn.inc(rowDataSize); + } + if (this.numRecordsIn != null) { + this.numRecordsIn.inc(rowCountSize); + } + } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java index 3cfceb567..d2cc328f9 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java @@ -145,13 +145,8 @@ public final class MySqlRecordEmitter<T> new Collector<T>() { @Override public void collect(final T t) { - if (sourceReaderMetrics.getNumRecordsIn() != null) { - sourceReaderMetrics.getNumRecordsIn().inc(1L); - } - if (sourceReaderMetrics.getNumBytesIn() != null) { - sourceReaderMetrics.getNumBytesIn() - .inc(t.toString().getBytes(StandardCharsets.UTF_8).length); - } + sourceReaderMetrics.outputMetrics(1L, + t.toString().getBytes(StandardCharsets.UTF_8).length); output.collect(t); } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java index 3ffd679f7..c3c475e92 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java @@ -26,6 +26,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions; import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions; import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange; @@ -37,6 +38,8 @@ import java.util.Set; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.APPEND_MODE; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; @@ -46,7 +49,6 @@ import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions. import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME; -import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INLONG_METRIC; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD; import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT; @@ -121,6 +123,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory final ReadableConfig config = helper.getOptions(); final String inlongMetric = config.get(INLONG_METRIC); + final String inlongAudit = config.get(INLONG_AUDIT); + ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); final String hostname = config.get(HOSTNAME); final String username = config.get(USERNAME); final String password = config.get(PASSWORD); @@ -186,7 +190,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, migrateAll, - inlongMetric); + inlongMetric, + inlongAudit); } @Override @@ -229,6 +234,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(HEARTBEAT_INTERVAL); options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); return options; } @@ -284,7 +290,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory * Checks the given regular expression's syntax is valid. * * @param optionName the option name of the regex - * @param regex The regular expression to be checked + * @param regex The regular expression to be checked * @throws ValidationException If the expression's syntax is invalid */ private void validateRegex(String optionName, String regex) { diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java index 7038a0e3a..621f122f6 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java @@ -82,6 +82,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Duration heartbeatInterval; private final boolean migrateAll; private final String inlongMetric; + private final String inlongAudit; // -------------------------------------------------------------------------------------------- // Mutable attributes // -------------------------------------------------------------------------------------------- @@ -123,7 +124,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat StartupOptions startupOptions, Duration heartbeatInterval, boolean migrateAll, - String inlongMetric) { + String inlongMetric, + String inlongAudit) { this( physicalSchema, port, @@ -150,7 +152,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat new Properties(), heartbeatInterval, migrateAll, - inlongMetric); + inlongMetric, + inlongAudit); } /** @@ -182,7 +185,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat Properties jdbcProperties, Duration heartbeatInterval, boolean migrateAll, - String inlongMetric) { + String inlongMetric, + String inlongAudit) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -212,6 +216,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat this.heartbeatInterval = heartbeatInterval; this.migrateAll = migrateAll; this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; } @Override @@ -271,6 +276,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .jdbcProperties(jdbcProperties) .heartbeatInterval(heartbeatInterval) .inlongMetric(inlongMetric) + .inlongAudit(inlongAudit) .build(); return SourceProvider.of(parallelSource); } else { @@ -286,6 +292,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .debeziumProperties(dbzProperties) .startupOptions(startupOptions) .inlongMetric(inlongMetric) + .inlongAudit(inlongAudit) .deserializer(deserializer); Optional.ofNullable(serverId) .ifPresent(serverId -> builder.serverId(Integer.parseInt(serverId))); @@ -358,7 +365,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat jdbcProperties, heartbeatInterval, migrateAll, - inlongMetric); + inlongMetric, + inlongAudit); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -397,7 +405,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat && Objects.equals(producedDataType, that.producedDataType) && Objects.equals(metadataKeys, that.metadataKeys) && Objects.equals(jdbcProperties, that.jdbcProperties) - && Objects.equals(inlongMetric, that.inlongMetric); + && Objects.equals(inlongMetric, that.inlongMetric) + && Objects.equals(inlongAudit, that.inlongAudit); } @Override @@ -427,7 +436,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat metadataKeys, scanNewlyAddedTableEnabled, jdbcProperties, - inlongMetric); + inlongMetric, + inlongAudit); } @Override