This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new da4e183247 [INLONG-9064][Sort] Add Audit report for Pulsar connector 
in flink 1.15 (#9080)
da4e183247 is described below

commit da4e1832473e55dbb32e54ea27ccac38ca0b742e
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Sun Oct 22 14:57:38 2023 +0800

    [INLONG-9064][Sort] Add Audit report for Pulsar connector in flink 1.15 
(#9080)
---
 .../protocol/node/extract/PulsarExtractNode.java   |  1 -
 .../inlong/sort/base/metric/SinkMetricData.java    |  4 ++-
 .../inlong/sort/base/metric/SourceMetricData.java  | 23 ++++++++++++++---
 .../pulsar/table/PulsarDynamicTableFactory.java    | 18 +++----------
 .../sort-connectors/pulsar/pom.xml                 |  6 +++++
 .../inlong/sort/pulsar/PulsarTableFactory.java     | 18 ++++++++++---
 .../table/PulsarTableDeserializationSchema.java    | 12 +++++++--
 .../PulsarTableDeserializationSchemaFactory.java   | 30 ++++++++++++++++++++--
 8 files changed, 85 insertions(+), 27 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index ead53f0e7c..cba35f8683 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -112,7 +112,6 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric {
         if (adminUrl != null) {
             options.put("admin-url", adminUrl);
         }
-        options.put("generic", "true");
         options.put("service-url", serviceUrl);
         options.put("topic", topic);
         options.put("scan.startup.mode", scanStartupMode);
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index fa2aee68d3..4b48c4ece2 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -41,8 +42,9 @@ import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS
 /**
  * A collection class for handling metrics
  */
-public class SinkMetricData implements MetricData {
+public class SinkMetricData implements MetricData, Serializable {
 
+    private static final long serialVersionUID = 1L;
     private final MetricGroup metricGroup;
     private final Map<String, String> labels;
     private final RegisteredMetric registeredMetric;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index ec64f53b39..0a0cad5d6e 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -43,10 +44,11 @@ import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS
 /**
  * A collection class for handling metrics
  */
-public class SourceMetricData implements MetricData {
+public class SourceMetricData implements MetricData, Serializable {
 
+    private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(SourceMetricData.class);
-    private final MetricGroup metricGroup;
+    private MetricGroup metricGroup;
     private final Map<String, String> labels;
     private Counter numRecordsIn;
     private Counter numBytesIn;
@@ -108,6 +110,16 @@ public class SourceMetricData implements MetricData {
         }
     }
 
+    public SourceMetricData(MetricOption option) {
+        this.labels = option.getLabels();
+
+        if (option.getIpPorts().isPresent()) {
+            AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+            this.auditOperator = AuditOperator.getInstance();
+            this.auditKeys = option.getInlongAuditKeys();
+        }
+    }
+
     /**
      * Default counter is {@link SimpleCounter}
      * groupId and streamId and nodeId are label value, user can use it filter 
metric data when use metric reporter
@@ -288,20 +300,23 @@ public class SourceMetricData implements MetricData {
 
     public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime) {
         outputDefaultMetrics(rowCountSize, rowDataSize);
-
         if (auditOperator != null) {
             for (Integer key : auditKeys) {
                 auditOperator.add(
                         key,
                         getGroupId(),
                         getStreamId(),
-                        dataTime,
+                        getCurrentOrProvidedTime(dataTime),
                         rowCountSize,
                         rowDataSize);
             }
         }
     }
 
+    private long getCurrentOrProvidedTime(long dataTime) {
+        return dataTime == 0 ? System.currentTimeMillis() : dataTime;
+    }
+
     private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
         if (numRecordsIn != null) {
             this.numRecordsIn.inc(rowCountSize);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 79af4e0d43..dd06bfe758 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -39,7 +39,6 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
-import org.apache.pulsar.common.naming.TopicName;
 
 import javax.annotation.Nullable;
 
@@ -163,7 +162,7 @@ public class PulsarDynamicTableFactory
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         ReadableConfig tableOptions = helper.getOptions();
 
-        List<String> topics = generateTopic(context.getObjectIdentifier(), 
tableOptions);
+        List<String> topics = generateTopic(tableOptions);
         if (topics != null && !topics.isEmpty()) {
             ((Configuration) tableOptions).set(TOPIC, 
Collections.singletonList(topics.get(0)));
         }
@@ -231,7 +230,7 @@ public class PulsarDynamicTableFactory
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         ReadableConfig tableOptions = helper.getOptions();
 
-        List<String> topics = generateTopic(context.getObjectIdentifier(), 
tableOptions);
+        List<String> topics = generateTopic(tableOptions);
         if (topics != null && !topics.isEmpty()) {
             ((Configuration) tableOptions).set(TOPIC, 
Collections.singletonList(topics.get(0)));
         }
@@ -338,17 +337,8 @@ public class PulsarDynamicTableFactory
         return options;
     }
 
-    private List<String> generateTopic(ObjectIdentifier table, ReadableConfig 
tableOptions) {
-        List<String> topics = null;
-        if (tableOptions.get(GENERIC)) {
-            topics = tableOptions.getOptional(TOPIC).orElse(null);
-        } else {
-            String rawTopic = table.getDatabaseName() + "/" + 
table.getObjectName();
-            final String topic = TopicName.get(rawTopic).toString();
-            topics = Collections.singletonList(topic);
-        }
-
-        return topics;
+    private List<String> generateTopic(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(TOPIC).orElse(null);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 501d39c637..9fc0949b46 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -38,6 +38,12 @@
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-base</artifactId>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 4adc2e64ba..6784fc6790 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -47,6 +47,9 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat;
@@ -137,6 +140,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
         final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
         final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
         final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory =
                 new PulsarTableDeserializationSchemaFactory(
                         physicalDataType,
@@ -144,7 +151,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                         keyProjection,
                         valueDecodingFormat,
                         valueProjection,
-                        UPSERT_DISABLED);
+                        UPSERT_DISABLED,
+                        inlongMetric,
+                        auditHostAndPorts,
+                        auditKeys);
 
         // Set default values for configuration not exposed to user.
         final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForMetadataPushdown =
@@ -190,8 +200,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 SINK_PARALLELISM,
                 KEY_FORMAT,
                 KEY_FIELDS,
-                EXPLICIT)
-                .collect(Collectors.toSet());
+                EXPLICIT,
+                AUDIT_KEYS,
+                INLONG_METRIC,
+                INLONG_AUDIT).collect(Collectors.toSet());
     }
 
     /** Format and Delivery guarantee related options are not forward options. 
*/
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 8377fce389..237de45b4d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -53,12 +56,15 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
 
     private final boolean upsertMode;
 
+    private SourceMetricData sourceMetricData;
+
     public PulsarTableDeserializationSchema(
             @Nullable DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo,
             PulsarRowDataConverter rowDataConverter,
-            boolean upsertMode) {
+            boolean upsertMode,
+            SourceMetricData sourceMetricData) {
         if (upsertMode) {
             checkNotNull(keyDeserialization, "upsert mode must specify a key 
format");
         }
@@ -67,6 +73,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
         this.rowDataConverter = checkNotNull(rowDataConverter);
         this.producedTypeInfo = checkNotNull(producedTypeInfo);
         this.upsertMode = upsertMode;
+        this.sourceMetricData = sourceMetricData;
     }
 
     @Override
@@ -96,7 +103,8 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             return;
         }
 
-        valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
+        valueDeserialization.deserialize(message.getData(),
+                new MetricsCollector<>(new ListCollector<>(valueRowData), 
sourceMetricData));
 
         rowDataConverter.projectToProducedRowAndCollect(
                 message, keyRowData, valueRowData, collector);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index 671e82e0d3..360b07aa69 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
@@ -86,13 +89,21 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
 
     private final boolean upsertMode;
 
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+    private SourceMetricData sourceMetricData;
+
     public PulsarTableDeserializationSchemaFactory(
             DataType physicalDataType,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
             int[] keyProjection,
             DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
             int[] valueProjection,
-            boolean upsertMode) {
+            boolean upsertMode,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
         this.physicalDataType =
                 checkNotNull(physicalDataType, "field physicalDataType must 
not be null.");
         this.keyDecodingFormat = keyDecodingFormat;
@@ -105,6 +116,10 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
         this.producedDataType = physicalDataType;
         this.connectorMetadataKeys = Collections.emptyList();
         this.upsertMode = upsertMode;
+
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     private @Nullable DeserializationSchema<RowData> createDeserialization(
@@ -151,12 +166,23 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
                         readableMetadata,
                         upsertMode);
 
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
+
         return new PulsarTableDeserializationSchema(
                 keyDeserialization,
                 valueDeserialization,
                 producedTypeInfo,
                 rowDataConverter,
-                upsertMode);
+                upsertMode,
+                sourceMetricData);
     }
 
     public void setProducedDataType(DataType producedDataType) {

Reply via email to