This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new da4e183247 [INLONG-9064][Sort] Add Audit report for Pulsar connector in flink 1.15 (#9080) da4e183247 is described below commit da4e1832473e55dbb32e54ea27ccac38ca0b742e Author: Sting <zpen...@connect.ust.hk> AuthorDate: Sun Oct 22 14:57:38 2023 +0800 [INLONG-9064][Sort] Add Audit report for Pulsar connector in flink 1.15 (#9080) --- .../protocol/node/extract/PulsarExtractNode.java | 1 - .../inlong/sort/base/metric/SinkMetricData.java | 4 ++- .../inlong/sort/base/metric/SourceMetricData.java | 23 ++++++++++++++--- .../pulsar/table/PulsarDynamicTableFactory.java | 18 +++---------- .../sort-connectors/pulsar/pom.xml | 6 +++++ .../inlong/sort/pulsar/PulsarTableFactory.java | 18 ++++++++++--- .../table/PulsarTableDeserializationSchema.java | 12 +++++++-- .../PulsarTableDeserializationSchemaFactory.java | 30 ++++++++++++++++++++-- 8 files changed, 85 insertions(+), 27 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index ead53f0e7c..cba35f8683 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -112,7 +112,6 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric { if (adminUrl != null) { options.put("admin-url", adminUrl); } - options.put("generic", "true"); options.put("service-url", serviceUrl); options.put("topic", topic); options.put("scan.startup.mode", scanStartupMode); diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index fa2aee68d3..4b48c4ece2 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -25,6 +25,7 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -41,8 +42,9 @@ import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS /** * A collection class for handling metrics */ -public class SinkMetricData implements MetricData { +public class SinkMetricData implements MetricData, Serializable { + private static final long serialVersionUID = 1L; private final MetricGroup metricGroup; private final Map<String, String> labels; private final RegisteredMetric registeredMetric; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index ec64f53b39..0a0cad5d6e 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -27,6 +27,7 @@ import org.apache.flink.metrics.SimpleCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -43,10 +44,11 @@ import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS /** * A collection class for handling metrics */ -public class SourceMetricData implements MetricData { +public class SourceMetricData implements MetricData, Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SourceMetricData.class); - private final MetricGroup metricGroup; + private MetricGroup metricGroup; private final Map<String, String> labels; private Counter numRecordsIn; private Counter numBytesIn; @@ -108,6 +110,16 @@ public class SourceMetricData implements MetricData { } } + public SourceMetricData(MetricOption option) { + this.labels = option.getLabels(); + + if (option.getIpPorts().isPresent()) { + AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + this.auditOperator = AuditOperator.getInstance(); + this.auditKeys = option.getInlongAuditKeys(); + } + } + /** * Default counter is {@link SimpleCounter} * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter @@ -288,20 +300,23 @@ public class SourceMetricData implements MetricData { public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime) { outputDefaultMetrics(rowCountSize, rowDataSize); - if (auditOperator != null) { for (Integer key : auditKeys) { auditOperator.add( key, getGroupId(), getStreamId(), - dataTime, + getCurrentOrProvidedTime(dataTime), rowCountSize, rowDataSize); } } } + private long getCurrentOrProvidedTime(long dataTime) { + return dataTime == 0 ? System.currentTimeMillis() : dataTime; + } + private void outputDefaultMetrics(long rowCountSize, long rowDataSize) { if (numRecordsIn != null) { this.numRecordsIn.inc(rowCountSize); diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java index 79af4e0d43..dd06bfe758 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java @@ -39,7 +39,6 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; -import org.apache.pulsar.common.naming.TopicName; import javax.annotation.Nullable; @@ -163,7 +162,7 @@ public class PulsarDynamicTableFactory FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); ReadableConfig tableOptions = helper.getOptions(); - List<String> topics = generateTopic(context.getObjectIdentifier(), tableOptions); + List<String> topics = generateTopic(tableOptions); if (topics != null && !topics.isEmpty()) { ((Configuration) tableOptions).set(TOPIC, Collections.singletonList(topics.get(0))); } @@ -231,7 +230,7 @@ public class PulsarDynamicTableFactory FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); ReadableConfig tableOptions = helper.getOptions(); - List<String> topics = generateTopic(context.getObjectIdentifier(), tableOptions); + List<String> topics = generateTopic(tableOptions); if (topics != null && !topics.isEmpty()) { ((Configuration) tableOptions).set(TOPIC, Collections.singletonList(topics.get(0))); } @@ -338,17 +337,8 @@ public class PulsarDynamicTableFactory return options; } - private List<String> generateTopic(ObjectIdentifier table, ReadableConfig tableOptions) { - List<String> topics = null; - if (tableOptions.get(GENERIC)) { - topics = tableOptions.getOptional(TOPIC).orElse(null); - } else { - String rawTopic = table.getDatabaseName() + "/" + table.getObjectName(); - final String topic = TopicName.get(rawTopic).toString(); - topics = Collections.singletonList(topic); - } - - return topics; + private List<String> generateTopic(ReadableConfig tableOptions) { + return tableOptions.getOptional(TOPIC).orElse(null); } // -------------------------------------------------------------------------------------------- diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml index 501d39c637..9fc0949b46 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml @@ -38,6 +38,12 @@ <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 4adc2e64ba..6784fc6790 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -47,6 +47,9 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat; @@ -137,6 +140,10 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); + String auditKeys = tableOptions.get(AUDIT_KEYS); + final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory = new PulsarTableDeserializationSchemaFactory( physicalDataType, @@ -144,7 +151,10 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { keyProjection, valueDecodingFormat, valueProjection, - UPSERT_DISABLED); + UPSERT_DISABLED, + inlongMetric, + auditHostAndPorts, + auditKeys); // Set default values for configuration not exposed to user. final DecodingFormat<DeserializationSchema<RowData>> decodingFormatForMetadataPushdown = @@ -190,8 +200,10 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { SINK_PARALLELISM, KEY_FORMAT, KEY_FIELDS, - EXPLICIT) - .collect(Collectors.toSet()); + EXPLICIT, + AUDIT_KEYS, + INLONG_METRIC, + INLONG_AUDIT).collect(Collectors.toSet()); } /** Format and Delivery guarantee related options are not forward options. */ diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 8377fce389..237de45b4d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; + import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -53,12 +56,15 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc private final boolean upsertMode; + private SourceMetricData sourceMetricData; + public PulsarTableDeserializationSchema( @Nullable DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo, PulsarRowDataConverter rowDataConverter, - boolean upsertMode) { + boolean upsertMode, + SourceMetricData sourceMetricData) { if (upsertMode) { checkNotNull(keyDeserialization, "upsert mode must specify a key format"); } @@ -67,6 +73,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc this.rowDataConverter = checkNotNull(rowDataConverter); this.producedTypeInfo = checkNotNull(producedTypeInfo); this.upsertMode = upsertMode; + this.sourceMetricData = sourceMetricData; } @Override @@ -96,7 +103,8 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc return; } - valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); + valueDeserialization.deserialize(message.getData(), + new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData)); rowDataConverter.projectToProducedRowAndCollect( message, keyRowData, valueRowData, collector); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java index 671e82e0d3..360b07aa69 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; + import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; @@ -86,13 +89,21 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { private final boolean upsertMode; + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + private SourceMetricData sourceMetricData; + public PulsarTableDeserializationSchemaFactory( DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, int[] keyProjection, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, int[] valueProjection, - boolean upsertMode) { + boolean upsertMode, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { this.physicalDataType = checkNotNull(physicalDataType, "field physicalDataType must not be null."); this.keyDecodingFormat = keyDecodingFormat; @@ -105,6 +116,10 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { this.producedDataType = physicalDataType; this.connectorMetadataKeys = Collections.emptyList(); this.upsertMode = upsertMode; + + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; } private @Nullable DeserializationSchema<RowData> createDeserialization( @@ -151,12 +166,23 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { readableMetadata, upsertMode); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } + return new PulsarTableDeserializationSchema( keyDeserialization, valueDeserialization, producedTypeInfo, rowDataConverter, - upsertMode); + upsertMode, + sourceMetricData); } public void setProducedDataType(DataType producedDataType) {