This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 704e3c0eb [INLONG-7503][Sort] Support multiple audit ids and introduce
timestamp collector (#7552)
704e3c0eb is described below
commit 704e3c0eb382c293468fd47e56dbf8c85713690b
Author: Schnapps <[email protected]>
AuthorDate: Fri Mar 10 10:34:24 2023 +0800
[INLONG-7503][Sort] Support multiple audit ids and introduce timestamp
collector (#7552)
---
inlong-sort/sort-common/pom.xml | 6 +++
.../org/apache/inlong/sort/util/AuditUtils.java | 54 +++++++++++++++++++++
inlong-sort/sort-connectors/base/pom.xml | 7 +++
.../org/apache/inlong/sort/base/Constants.java | 17 ++++---
.../inlong/sort/base/metric/MetricOption.java | 51 +++++++++++++-------
.../inlong/sort/base/metric/SourceMetricData.java | 55 +++++++++++++++++-----
.../sort/cdc/base/source/IncrementalSource.java | 2 +-
.../table/DorisDynamicSchemaOutputFormat.java | 2 +-
.../sort/doris/table/DorisDynamicTableFactory.java | 2 +
.../sort/elasticsearch/ElasticsearchSinkBase.java | 2 +-
.../sort/filesystem/FileSystemTableFactory.java | 2 +
.../filesystem/stream/AbstractStreamingWriter.java | 2 +-
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 2 +-
.../hive/filesystem/AbstractStreamingWriter.java | 2 +-
.../sort/hive/table/HiveTableInlongFactory.java | 2 +
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 +
.../sink/multiple/DynamicSchemaHandleOperator.java | 2 +-
.../sink/multiple/IcebergMultipleStreamWriter.java | 2 +-
.../sink/multiple/IcebergSingleStreamWriter.java | 2 +-
.../jdbc/internal/JdbcBatchingOutputFormat.java | 2 +-
.../internal/JdbcMultiBatchingOutputFormat.java | 2 +-
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 2 +
.../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 2 +-
.../inlong/sort/kafka/FlinkKafkaProducer.java | 2 +-
.../sort/kafka/table/KafkaDynamicTableFactory.java | 2 +
.../sort/kudu/sink/AbstractKuduSinkFunction.java | 2 +-
.../sort/kudu/table/KuduDynamicTableFactory.java | 2 +
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 2 +-
.../mongodb/table/MongoDBTableSourceFactory.java | 2 +
.../sort/cdc/debezium/DebeziumSourceFunction.java | 2 +-
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 2 +-
.../mysql/table/MySqlTableInlongSourceFactory.java | 2 +
.../sort/cdc/base/source/IncrementalSource.java | 2 +-
.../oracle/debezium/DebeziumSourceFunction.java | 2 +-
.../cdc/oracle/table/OracleTableSourceFactory.java | 2 +
.../sort/cdc/postgres/DebeziumSourceFunction.java | 2 +-
.../cdc/postgres/table/PostgreSQLTableFactory.java | 2 +
inlong-sort/sort-connectors/pulsar/pom.xml | 6 +++
.../inlong/sort/pulsar/FlinkPulsarSource.java | 9 +++-
.../table/DynamicPulsarDeserializationSchema.java | 16 ++-----
.../pulsar/table/PulsarDynamicTableFactory.java | 12 +++--
.../pulsar/table/PulsarDynamicTableSource.java | 14 ++++--
.../table/UpsertPulsarDynamicTableFactory.java | 4 +-
.../pulsar/withoutadmin/FlinkPulsarSource.java | 9 +++-
.../sort/pulsar/withoutadmin/MetricsCollector.java | 55 ++++++++++++++++++++++
.../sort/redis/sink/AbstractRedisSinkFunction.java | 2 +-
.../sort/redis/table/RedisDynamicTableFactory.java | 2 +
.../sqlserver/table/DebeziumSourceFunction.java | 2 +-
.../cdc/sqlserver/table/SqlServerTableFactory.java | 2 +
.../table/sink/StarRocksDynamicSinkFunction.java | 2 +-
.../base/collectors/TimestampedCollector.java | 30 ++++++++++++
.../inlongmsg/InLongMsgDeserializationSchema.java | 14 ++++--
52 files changed, 348 insertions(+), 83 deletions(-)
diff --git a/inlong-sort/sort-common/pom.xml b/inlong-sort/sort-common/pom.xml
index bfb3f20fa..f89c02447 100644
--- a/inlong-sort/sort-common/pom.xml
+++ b/inlong-sort/sort-common/pom.xml
@@ -60,6 +60,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
new file mode 100644
index 000000000..30e8c3f8f
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class AuditUtils {
+
+ public static final String DELIMITER = "&";
+
+ private static final String IP_OR_HOST_PORT =
"^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+ + "2}|[1-9]\\d{"
+ + "3}|[1-5]\\d{"
+ + "4}|6[0-4]\\d{"
+ + "3}|65[0-4]\\d{"
+ + "2}|655[0-2]\\d|6553[0-5])$";
+
+ public static HashSet<String> extractAuditIpPorts(String inlongAudit) {
+ HashSet<String> ipPortList = new HashSet<>();
+ String[] ipPorts = inlongAudit.split(DELIMITER);
+ for (String ipPort : ipPorts) {
+ Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT,
ipPort),
+ "Error inLong audit format: " + inlongAudit);
+ ipPortList.add(ipPort);
+ }
+ return ipPortList;
+ }
+
+ public static List<Integer> extractAuditKeys(String auditKeys) {
+ return Arrays.stream(auditKeys.split(DELIMITER)).map(Integer::valueOf)
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/pom.xml
b/inlong-sort/sort-connectors/base/pom.xml
index 23a8f597c..3942a5887 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -48,6 +48,13 @@
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7af393fe7..ac2a7c3a9 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -76,6 +76,11 @@ public final class Constants {
* Node id used in inlong metric
*/
public static final String NODE_ID = "nodeId";
+ // sort received successfully
+ public static final String AUDIT_SORT_INPUT = "7";
+
+ // sort send successfully
+ public static final Integer AUDIT_SORT_OUTPUT = 8;
/**
* Database Name used in inlong metric
*/
@@ -117,12 +122,6 @@ public final class Constants {
*/
public static final String KEY_VALUE_DELIMITER = "=";
- // sort received successfully
- public static final Integer AUDIT_SORT_INPUT = 7;
-
- // sort send successfully
- public static final Integer AUDIT_SORT_OUTPUT = 8;
-
public static final String INLONG_METRIC_STATE_NAME =
"inlong-metric-states";
/**
@@ -149,6 +148,12 @@ public final class Constants {
.withDescription("Audit proxy host address for reporting
audit metrics. \n"
+ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+ public static final ConfigOption<String> AUDIT_KEYS =
+ ConfigOptions.key("metrics.audit.key")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Audit keys for metrics collecting");
+
public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
ConfigOptions.key("sink.ignore.changelog")
.booleanType()
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index 91e9f9f89..cd6a6b5b7 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.base.metric;
+import java.util.List;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -26,25 +27,24 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.regex.Pattern;
import java.util.stream.Stream;
+import org.apache.inlong.sort.util.AuditUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.GROUP_ID;
import static org.apache.inlong.sort.base.Constants.STREAM_ID;
public class MetricOption implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricOption.class);
+
private static final long serialVersionUID = 1L;
- private static final String IP_OR_HOST_PORT =
"^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
- + "2}|[1-9]\\d{"
- + "3}|[1-5]\\d{"
- + "4}|6[0-4]\\d{"
- + "3}|65[0-4]\\d{"
- + "2}|655[0-2]\\d|6553[0-5])$";
private Map<String, String> labels;
- private final HashSet<String> ipPortList;
+ private HashSet<String> ipPortList;
private String ipPorts;
private RegisteredMetric registeredMetric;
private long initRecords;
@@ -52,6 +52,7 @@ public class MetricOption implements Serializable {
private long initDirtyRecords;
private long initDirtyBytes;
private long readPhase;
+ private List<Integer> inlongAuditKeys;
private MetricOption(
String inlongLabels,
@@ -61,7 +62,8 @@ public class MetricOption implements Serializable {
long initBytes,
Long initDirtyRecords,
Long initDirtyBytes,
- Long readPhase) {
+ Long readPhase,
+ String inlongAuditKeys) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
"Inlong labels must be set for register metric.");
@@ -80,17 +82,22 @@ public class MetricOption implements Serializable {
labels.put(key, value);
});
- this.ipPortList = new HashSet<>();
this.ipPorts = inlongAudit;
+
if (ipPorts != null) {
+
Preconditions.checkArgument(labels.containsKey(GROUP_ID) &&
labels.containsKey(STREAM_ID),
"groupId and streamId must be set when enable inlong audit
collect.");
- String[] ipPortStrs = inlongAudit.split(DELIMITER);
- for (String ipPort : ipPortStrs) {
- Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT,
ipPort),
- "Error inLong audit format: " + inlongAudit);
- this.ipPortList.add(ipPort);
+
+ if (inlongAuditKeys == null) {
+ LOG.warn("should set inlongAuditKeys when enable inlong audit
collect, "
+ + "fallback to use id {} as audit key",
AUDIT_SORT_INPUT);
+ inlongAuditKeys = AUDIT_SORT_INPUT;
}
+
+ this.inlongAuditKeys =
AuditUtils.extractAuditKeys(inlongAuditKeys);
+ this.ipPortList = AuditUtils.extractAuditIpPorts(ipPorts);
+
}
if (registeredMetric != null) {
@@ -138,6 +145,10 @@ public class MetricOption implements Serializable {
this.initDirtyRecords = initDirtyRecords;
}
+ public List<Integer> getInlongAuditKeys() {
+ return inlongAuditKeys;
+ }
+
public long getInitDirtyBytes() {
return initDirtyBytes;
}
@@ -168,6 +179,7 @@ public class MetricOption implements Serializable {
private String inlongLabels;
private String inlongAudit;
+ private String inlongAuditKeys;
private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
private long initRecords = 0L;
private long initBytes = 0L;
@@ -183,11 +195,16 @@ public class MetricOption implements Serializable {
return this;
}
- public MetricOption.Builder withInlongAudit(String inlongAudit) {
+ public MetricOption.Builder withAuditAddress(String inlongAudit) {
this.inlongAudit = inlongAudit;
return this;
}
+ public MetricOption.Builder withAuditKeys(String inlongAuditIds) {
+ this.inlongAuditKeys = inlongAuditIds;
+ return this;
+ }
+
public MetricOption.Builder withRegisterMetric(RegisteredMetric
registeredMetric) {
this.registeredMetric = registeredMetric;
return this;
@@ -223,7 +240,7 @@ public class MetricOption implements Serializable {
return null;
}
return new MetricOption(inlongLabels, inlongAudit,
registeredMetric, initRecords, initBytes,
- initDirtyRecords, initDirtyBytes, initReadPhase);
+ initDirtyRecords, initDirtyBytes, initReadPhase,
inlongAuditKeys);
}
}
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e365103fe..e5ffdf844 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -17,15 +17,17 @@
package org.apache.inlong.sort.base.metric;
+import java.util.List;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditOperator;
-import org.apache.inlong.sort.base.Constants;
import java.nio.charset.StandardCharsets;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
@@ -39,6 +41,7 @@ import static
org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
*/
public class SourceMetricData implements MetricData {
+ private static final Logger LOG =
LoggerFactory.getLogger(SourceMetricData.class);
private final MetricGroup metricGroup;
private final Map<String, String> labels;
private Counter numRecordsIn;
@@ -48,6 +51,7 @@ public class SourceMetricData implements MetricData {
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
private AuditOperator auditOperator;
+ private List<Integer> auditKeys;
public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -71,6 +75,7 @@ public class SourceMetricData implements MetricData {
if (option.getIpPorts().isPresent()) {
AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
this.auditOperator = AuditOperator.getInstance();
+ this.auditKeys = option.getInlongAuditKeys();
}
}
@@ -193,7 +198,45 @@ public class SourceMetricData implements MetricData {
outputMetrics(1, size);
}
+ public void outputMetricsWithEstimate(Object data, long dataTime) {
+ long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
+ outputMetrics(1, size, dataTime);
+ }
+
public void outputMetrics(long rowCountSize, long rowDataSize) {
+ outputDefaultMetrics(rowCountSize, rowDataSize);
+
+ if (auditOperator != null) {
+ for (Integer key : auditKeys) {
+ auditOperator.add(
+ key,
+ getGroupId(),
+ getStreamId(),
+ System.currentTimeMillis(),
+ rowCountSize,
+ rowDataSize);
+ }
+
+ }
+ }
+
+ public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime) {
+ outputDefaultMetrics(rowCountSize, rowDataSize);
+
+ if (auditOperator != null) {
+ for (Integer key : auditKeys) {
+ auditOperator.add(
+ key,
+ getGroupId(),
+ getStreamId(),
+ dataTime,
+ rowCountSize,
+ rowDataSize);
+ }
+ }
+ }
+
+ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
if (numRecordsIn != null) {
this.numRecordsIn.inc(rowCountSize);
}
@@ -209,16 +252,6 @@ public class SourceMetricData implements MetricData {
if (numBytesInForMeter != null) {
this.numBytesInForMeter.inc(rowDataSize);
}
-
- if (auditOperator != null) {
- auditOperator.add(
- Constants.AUDIT_SORT_INPUT,
- getGroupId(),
- getStreamId(),
- System.currentTimeMillis(),
- rowCountSize,
- rowDataSize);
- }
}
@Override
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index d74e4ea72..de3bec22e 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -122,7 +122,7 @@ public class IncrementalSource<T, C extends SourceConfig>
// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(jdbcSourceConfig.getInlongMetric())
- .withInlongAudit(jdbcSourceConfig.getInlongAudit())
+ .withAuditAddress(jdbcSourceConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index ebc8cfca2..2ee979d15 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -253,7 +253,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index a47fd071b..e5e482c29 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -55,6 +55,7 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -217,6 +218,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(FactoryUtil.SINK_PARALLELISM);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 3de33e5cf..e26ebdb10 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -301,7 +301,7 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
index b85f9ac6d..637876abf 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -50,6 +50,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
@@ -133,6 +134,7 @@ public class FileSystemTableFactory implements
DynamicTableSourceFactory, Dynami
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(IGNORE_ALL_CHANGELOG);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 1e7195e07..3d8531bd4 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -136,7 +136,7 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
super.open();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 76df0cc9e..1d62d8f56 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -155,7 +155,7 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
this.runtimeContext = getRuntimeContext();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 4a0298384..4a359ad7e 100644
---
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -155,7 +155,7 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
super.open();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 054fb4ab4..aa33d3b31 100644
---
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -51,6 +51,7 @@ import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOp
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
import static
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.hive.HiveOptions.HIVE_DATABASE;
@@ -91,6 +92,7 @@ public class HiveTableInlongFactory implements
DynamicTableSourceFactory, Dynami
options.add(HADOOP_CONF_DIR);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index e872d248b..c6d3ed874 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -50,6 +50,7 @@ import
org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import java.util.Map;
import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -264,6 +265,7 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
options.add(IGNORE_ALL_CHANGELOG);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
options.add(SINK_MULTIPLE_ENABLE);
options.add(SINK_MULTIPLE_FORMAT);
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index b95278223..12914098e 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -141,7 +141,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
// Initialize metric
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 7d2973a3d..b6c5f9851 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -136,7 +136,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
this.runtimeContext = getRuntimeContext();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 502c47008..d44927db1 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -113,7 +113,7 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
if (!multipleSink) {
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index ec3331b4a..07ea97e70 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -155,7 +155,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
this.runtimeContext = getRuntimeContext();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index bf6964927..98f044c55 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -179,7 +179,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
this.runtimeContext = getRuntimeContext();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index e12128ec8..39a8daaa1 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -46,6 +46,7 @@ import
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.JdbcUrlUtils;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -393,6 +394,7 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
optionalOptions.add(INLONG_METRIC);
optionalOptions.add(INLONG_AUDIT);
+ optionalOptions.add(AUDIT_KEYS);
return optionalOptions;
}
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index b41d75762..b3aa4dbbb 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -829,7 +829,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
public void run(SourceContext<T> sourceContext) throws Exception {
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withRegisterMetric(RegisteredMetric.ALL)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 93a84c733..935a54afa 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -934,7 +934,7 @@ public class FlinkKafkaProducer<IN>
}
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index d92d5cfed..f2d48e95a 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -98,6 +98,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -309,6 +310,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
options.add(KAFKA_IGNORE_ALL_CHANGELOG);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
options.add(SINK_MULTIPLE_FORMAT);
options.add(SINK_MULTIPLE_PARTITION_PATTERN);
return options;
diff --git
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
index b41914073..6acb9cd32 100644
---
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
+++
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
@@ -115,7 +115,7 @@ public abstract class AbstractKuduSinkFunction
this.running = true;
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inLongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
index a33c4183b..642b9b617 100644
---
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
@@ -39,6 +39,7 @@ import java.util.Optional;
import java.util.Set;
import static
org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.kudu.common.KuduOptions.CONNECTOR_MASTERS;
@@ -177,6 +178,7 @@ public class KuduDynamicTableFactory
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
return options;
}
}
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 83bc75e49..1a763a16c 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -450,7 +450,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
"sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 7fe19e27f..9e15493bb 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -43,6 +43,7 @@ import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOp
import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -166,6 +167,7 @@ public class MongoDBTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SOURCE_MULTIPLE_ENABLE);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
return options;
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index d99da8389..43ee4f53b 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -450,7 +450,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
"sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 1aef8af2e..ea79e14b8 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -144,7 +144,7 @@ public class MySqlSource<T>
configFactory.createConfig(readerContext.getIndexOfSubtask());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(sourceConfig.getInlongMetric())
- .withInlongAudit(sourceConfig.getInlongAudit())
+ .withAuditAddress(sourceConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();
sourceReaderMetrics.registerMetrics(metricOption);
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 35ee07e63..6aaf06ad7 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -235,6 +236,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(ROW_KINDS_FILTERED);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index 48e711eb9..1e6cd1245 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -124,7 +124,7 @@ public class IncrementalSource<T, C extends SourceConfig>
// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(jdbcSourceConfig.getInlongMetric())
- .withInlongAudit(jdbcSourceConfig.getInlongAudit())
+ .withAuditAddress(jdbcSourceConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index 0416424a5..7ee86f214 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -452,7 +452,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
"sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index a91976b7c..99aed8f12 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -53,6 +53,7 @@ import static
com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static
com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.PORT;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -166,6 +167,7 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index ef3f744de..b1a7a5851 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -452,7 +452,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
"sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index af134c935..3777f9f6e 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -30,6 +30,7 @@ import java.util.Set;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -212,6 +213,7 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
options.add(SOURCE_MULTIPLE_ENABLE);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
options.add(APPEND_MODE);
options.add(ROW_KINDS_FILTERED);
return options;
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-connectors/pulsar/pom.xml
index c7b493def..5d5e3da54 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -62,6 +62,12 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
index 9fb38d238..9c83bbc58 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
@@ -244,13 +244,16 @@ public class FlinkPulsarSource<T>
protected String auditHostAndPorts;
+ protected String auditKeys;
+
public FlinkPulsarSource(
String adminUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ String auditKeys) {
this.adminUrl = checkNotNull(adminUrl);
this.clientConfigurationData = checkNotNull(clientConf);
this.deserializer = deserializer;
@@ -276,6 +279,7 @@ public class FlinkPulsarSource<T>
this.oldStateVersion =
SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = inlongAudit;
+ this.auditKeys = auditKeys;
}
// ------------------------------------------------------------------------
@@ -446,7 +450,8 @@ public class FlinkPulsarSource<T>
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
.withRegisterMetric(RegisteredMetric.ALL)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 60212768b..f3c71e029 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -30,7 +30,7 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
+import org.apache.inlong.sort.pulsar.withoutadmin.MetricsCollector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
@@ -120,12 +120,8 @@ public class DynamicPulsarDeserializationSchema implements
PulsarDeserialization
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
- valueDeserialization.deserialize(message.getData(), new
CallbackCollector<>(inputRow -> {
- if (sourceMetricData != null) {
- sourceMetricData.outputMetricsWithEstimate(inputRow);
- }
- collector.collect(inputRow);
- }));
+ valueDeserialization.deserialize(message.getData(),
+ new MetricsCollector<>(collector, sourceMetricData));
return;
}
BufferingCollector keyCollector = new BufferingCollector();
@@ -143,10 +139,8 @@ public class DynamicPulsarDeserializationSchema implements
PulsarDeserialization
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(message.getData(), new
CallbackCollector<>(inputRow -> {
- sourceMetricData.outputMetricsWithEstimate(inputRow);
- outputCollector.collect(inputRow);
- }));
+ valueDeserialization.deserialize(message.getData(), new
MetricsCollector<>(
+ outputCollector, sourceMetricData));
}
keyCollector.buffer.clear();
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index bd5be42b2..c6d6a19e4 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -76,6 +76,7 @@ import static
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -275,6 +276,8 @@ public class PulsarDynamicTableFactory
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
return createPulsarTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -289,7 +292,7 @@ public class PulsarDynamicTableFactory
properties,
startupOptions,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts, auditKeys);
}
@Override
@@ -329,6 +332,7 @@ public class PulsarDynamicTableFactory
options.add(PROPERTIES);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
return options;
}
@@ -361,7 +365,8 @@ public class PulsarDynamicTableFactory
Properties properties,
PulsarTableOptions.StartupOptions startupOptions,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ String auditKeys) {
return new PulsarDynamicTableSource(
physicalDataType,
keyDecodingFormat,
@@ -377,6 +382,7 @@ public class PulsarDynamicTableFactory
startupOptions,
false,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ auditKeys);
}
}
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index a0eeb961d..e9465fcb2 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -141,6 +141,7 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
protected String inlongMetric;
+ protected String auditKeys;
protected String auditHostAndPorts;
public PulsarDynamicTableSource(
@@ -158,7 +159,8 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
PulsarTableOptions.StartupOptions startupOptions,
boolean upsertMode,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ String auditKeys) {
this.producedDataType = physicalDataType;
setTopicInfo(properties, topics, topicPattern);
@@ -187,6 +189,7 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
this.upsertMode = upsertMode;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
private void setTopicInfo(Properties properties, List<String> topics,
String topicPattern) {
@@ -277,7 +280,8 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
deserializationSchema,
properties,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ auditKeys);
if (watermarkStrategy != null) {
source.assignTimestampsAndWatermarks(watermarkStrategy);
@@ -312,7 +316,8 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
deserializationSchema,
properties,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ auditKeys);
if (watermarkStrategy != null) {
source.assignTimestampsAndWatermarks(watermarkStrategy);
@@ -349,7 +354,8 @@ public class PulsarDynamicTableSource implements
ScanTableSource, SupportsReadin
startupOptions,
false,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ auditKeys);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index ef68c44ed..502a10a62 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -66,6 +66,7 @@ import static
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
import static
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -188,6 +189,7 @@ public class UpsertPulsarDynamicTableFactory implements
DynamicTableSourceFactor
String topicPattern = tableOptions.get(TOPIC_PATTERN);
String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
return new PulsarDynamicTableSource(
schema.toPhysicalRowDataType(),
@@ -204,7 +206,7 @@ public class UpsertPulsarDynamicTableFactory implements
DynamicTableSourceFactor
startupOptions,
true,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts, auditKeys);
}
@Override
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
index 8150fb894..2a84ec041 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
@@ -243,6 +243,8 @@ public class FlinkPulsarSource<T>
*/
private String inlongAudit;
+ private String inlongAuditKeys;
+
private SourceMetricData sourceMetricData;
private transient ListState<MetricState> metricStateListState;
@@ -253,7 +255,8 @@ public class FlinkPulsarSource<T>
PulsarDeserializationSchema<T> deserializer,
Properties properties,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ String inlongAuditKeys) {
this.inlongAudit = inlongAudit;
this.inlongMetric = inlongMetric;
this.serverUrl = checkNotNull(serverUrl);
@@ -272,6 +275,7 @@ public class FlinkPulsarSource<T>
SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
this.useMetrics =
SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
+ this.inlongAuditKeys = inlongAuditKeys;
CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
@@ -431,7 +435,8 @@ public class FlinkPulsarSource<T>
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(inlongAudit)
+ .withAuditAddress(inlongAudit)
+ .withAuditKeys(inlongAuditKeys)
.withRegisterMetric(RegisteredMetric.ALL)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
new file mode 100644
index 000000000..947337dbe
--- /dev/null
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.withoutadmin;
+
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
+
+/**
+ * sending metrics each time a record is collected.
+ * @param <T>
+ */
+public class MetricsCollector<T> implements TimestampedCollector<T> {
+
+ private Collector<T> collector;
+
+ private long timestampMillis;
+
+ SourceMetricData metricData;
+
+ public MetricsCollector(Collector<T> collector,
+ SourceMetricData sourceMetricData) {
+ this.metricData = sourceMetricData;
+ this.collector = collector;
+ }
+
+ public void resetTimestamp(long timestampMillis) {
+ this.timestampMillis = timestampMillis;
+ }
+
+ @Override
+ public void collect(T record) {
+ metricData.outputMetricsWithEstimate(record, timestampMillis);
+ collector.collect(record);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
index 97ae9b9f2..5f0c61eb4 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
@@ -171,7 +171,7 @@ public abstract class AbstractRedisSinkFunction<OUT>
}
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inLongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
index 8edf979e9..0f2a56e0f 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.redis.table;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.redis.common.config.RedisOptions.DATA_TYPE;
@@ -210,6 +211,7 @@ public class RedisDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
options.add(RedisOptions.SENTINELS_INFO);
options.add(RedisOptions.SOCKET_TIMEOUT);
options.add(RedisOptions.TIMEOUT);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index 07e026c5d..c207f5775 100644
---
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -429,7 +429,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
"sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
diff --git
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
index b0454811e..03747d0f6 100644
---
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+++
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -34,6 +34,7 @@ import java.util.Set;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -161,6 +162,7 @@ public class SqlServerTableFactory implements
DynamicTableSourceFactory {
options.add(SCAN_STARTUP_MODE);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(AUDIT_KEYS);
return options;
}
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 3b70c35bb..08335e0d3 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -158,7 +158,7 @@ public class StarRocksDynamicSinkFunction<T> extends
RichSinkFunction<T> impleme
sinkManager.startAsyncFlushing();
MetricOption metricOption =
MetricOption.builder().withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
+ .withAuditAddress(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
new file mode 100644
index 000000000..fa8ad8b09
--- /dev/null
+++
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base.collectors;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Collector that support timestamp collection.
+ * @param <T>
+ */
+public interface TimestampedCollector<T> extends Collector<T> {
+
+ void resetTimestamp(long timestampMillis);
+
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index 6b22837c9..9789488ba 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public class InLongMsgDeserializationSchema implements
DeserializationSchema<Row
}
@Override
- public RowData deserialize(byte[] bytes) throws IOException {
+ public RowData deserialize(byte[] bytes) {
throw new RuntimeException(
"Please invoke DeserializationSchema#deserialize(byte[],
Collector<RowData>) instead.");
}
@@ -106,10 +107,14 @@ public class InLongMsgDeserializationSchema implements
DeserializationSchema<Row
continue;
}
+ if (out instanceof TimestampedCollector) {
+ ((TimestampedCollector<RowData>)
out).resetTimestamp(head.getTime().getTime());
+ }
+
List<RowData> list = new ArrayList<>();
ListCollector<RowData> collector = new ListCollector<>(list);
deserializationSchema.deserialize(bodyBytes, collector);
- list.stream().forEach(rowdata -> emitRow(head,
(GenericRowData) rowdata, out));
+ list.forEach(rowdata -> emitRow(head, (GenericRowData)
rowdata, out));
}
}
@@ -153,12 +158,15 @@ public class InLongMsgDeserializationSchema implements
DeserializationSchema<Row
/** add metadata column */
private void emitRow(InLongMsgHead head, GenericRowData physicalRow,
Collector<RowData> out) {
+
if (metadataConverters.length == 0) {
out.collect(physicalRow);
return;
}
- final int physicalArity = physicalRow.getArity();
+
final int metadataArity = metadataConverters.length;
+ final int physicalArity = physicalRow.getArity();
+
final GenericRowData producedRow =
new GenericRowData(physicalRow.getRowKind(), physicalArity +
metadataArity);
for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {