This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5674bc24f8 [INLONG-10069][Sort] Support audit metrics for 
sort-connector-pulsar-1.18 (#10070)
5674bc24f8 is described below

commit 5674bc24f846609fd39e78f6f203dfb629b30af0
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Apr 26 18:52:26 2024 +0800

    [INLONG-10069][Sort] Support audit metrics for sort-connector-pulsar-1.18 
(#10070)
---
 .../sort/pulsar/table/PulsarTableFactory.java      |  15 +-
 .../sort/pulsar/table/PulsarTableOptionUtils.java  |   1 +
 .../sort/pulsar/table/PulsarTableOptions.java      |   1 +
 .../pulsar/table/PulsarTableValidationUtils.java   |   4 +-
 .../source/PulsarTableDeserializationSchema.java   | 132 +++++++++++
 .../PulsarTableDeserializationSchemaFactory.java   | 241 +++++++++++++++++++++
 .../pulsar/table/source/PulsarTableSource.java     | 227 +++++++++++++++++++
 licenses/inlong-sort-connectors/LICENSE            |   3 +
 8 files changed, 620 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index a6e7caa00e..4b982f4691 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import 
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
+import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
@@ -31,8 +34,6 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
 import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
-import 
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
-import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -99,6 +100,7 @@ import static 
org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils
  *
  * <p>The main role of this class is to retrieve config options and validate 
options from config and
  * the table schema. It also sets default values if a config option is not 
present.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.PulsarTableFactory}
  */
 public class PulsarTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
@@ -154,6 +156,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
         final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
         final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory =
                 new PulsarTableDeserializationSchemaFactory(
                         physicalDataType,
@@ -161,7 +167,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
                         keyProjection,
                         valueDecodingFormat,
                         valueProjection,
-                        UPSERT_DISABLED);
+                        UPSERT_DISABLED,
+                        inlongMetric,
+                        auditHostAndPorts,
+                        auditKeys);
 
         // Set default values for configuration not exposed to user.
         final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForMetadataPushdown =
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
index 12ef459280..7ff329a660 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
@@ -80,6 +80,7 @@ import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMA
  *   <li>Create key and value encoding/decoding format.
  *   <li>Create key and value projection.
  * </ul>
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils}
  */
 public class PulsarTableOptionUtils {
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
index ae186dc739..bc78dd2d98 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -37,6 +37,7 @@ import static 
org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
  * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
  * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
  * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.PulsarTableOptions}
  */
 @PublicEvolving
 public final class PulsarTableOptions {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
index 8dac83871a..68ee618fc3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
@@ -48,7 +48,9 @@ import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBS
 import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
 import static org.apache.pulsar.common.naming.TopicName.isValid;
 
-/** Util class for source and sink validation rules. */
+/** Util class for source and sink validation rules.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils}
+ * */
 public class PulsarTableValidationUtils {
 
     private PulsarTableValidationUtils() {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
new file mode 100644
index 0000000000..13ec80862f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link 
PulsarTableSource}.
+ *
+ * <p>Both Flink's key decoding format and value decoding format are wrapped 
in this class. It is
+ * responsible for getting metadata fields from a physical pulsar message 
body, and the final
+ * projection mapping from Pulsar message fields to Flink row.
+ *
+ * <p>After retrieving key and value bytes and convert them into a list of 
{@link RowData}, it then
+ * delegates metadata appending, key and value {@link RowData} combining to a 
{@link
+ * PulsarRowDataConverter} instance.
+ * Modify from {@link 
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema}
+ */
+public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    @Nullable
+    private final DeserializationSchema<RowData> keyDeserialization;
+
+    private final DeserializationSchema<RowData> valueDeserialization;
+
+    private final PulsarRowDataConverter rowDataConverter;
+
+    private final boolean upsertMode;
+
+    private SourceMetricData sourceMetricData;
+
+    private MetricOption metricOption;
+
+    public PulsarTableDeserializationSchema(
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo,
+            PulsarRowDataConverter rowDataConverter,
+            boolean upsertMode,
+            MetricOption metricOption) {
+        if (upsertMode) {
+            checkNotNull(keyDeserialization, "upsert mode must specify a key 
format");
+        }
+        this.keyDeserialization = keyDeserialization;
+        this.valueDeserialization = checkNotNull(valueDeserialization);
+        this.rowDataConverter = checkNotNull(rowDataConverter);
+        this.producedTypeInfo = checkNotNull(producedTypeInfo);
+        this.upsertMode = upsertMode;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(PulsarInitializationContext context, SourceConfiguration 
configuration)
+            throws Exception {
+        if (keyDeserialization != null) {
+            keyDeserialization.open(context);
+        }
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
+        valueDeserialization.open(context);
+    }
+
+    @Override
+    public void deserialize(Message<byte[]> message, Collector<RowData> 
collector)
+            throws IOException {
+
+        // Get the key row data
+        List<RowData> keyRowData = new ArrayList<>();
+        if (keyDeserialization != null) {
+            keyDeserialization.deserialize(message.getKeyBytes(), new 
ListCollector<>(keyRowData));
+        }
+
+        // Get the value row data
+        List<RowData> valueRowData = new ArrayList<>();
+
+        if (upsertMode && message.getData().length == 0) {
+            rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, 
collector);
+            return;
+        }
+
+        valueDeserialization.deserialize(message.getData(),
+                new MetricsCollector<>(new ListCollector<>(valueRowData), 
sourceMetricData));
+
+        rowDataConverter.projectToProducedRowAndCollect(
+                message, keyRowData, valueRowData, collector);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
new file mode 100644
index 0000000000..87ce2c7541
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
+import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Contains key, value projection and format information, and use such 
information to create a
+ * {@link PulsarTableDeserializationSchema} instance used by runtime {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSource} instance.
+ *
+ * <p>A Flink row fields has a strict order: Physical Fields (Key + value) + 
Format Metadata Fields
+ * Connector Metadata Fields. Physical Fields are fields come directly from 
Pulsar message body;
+ * Format Metadata Fields are from the extra information from the decoding 
format. Connector
+ * metadata fields are the ones most Pulsar messages have, such as publish 
time, message size and
+ * producer name.
+ *
+ * <p>In general, Physical fields + Format Metadata fields are contained in 
the RowData decoded
+ * using valueDecodingFormat. Only Connector Metadata fields needs to be 
appended to the decoded
+ * RowData. The tricky part is to put format metadata and connector metadata 
in the right location.
+ * This requires an explicit adjustment process.
+ *
+ * <p>For example, suppose Physical Fields (Key + value) + Format Metadata 
Fields + Connector
+ * Metadata Fields. has arity of 11, key projection is [0, 6], and physical 
value projection is [1,
+ * 2, 3, 4, 5], Then after the adjustment, key projection should be [0, 6], 
physical value
+ * projection should be [1, 2, 3, 4, 5] and format metadata projection should 
be [7], connector
+ * metadata projection should be [8, 9, 10].
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory}
+ */
+public class PulsarTableDeserializationSchemaFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final DataType physicalDataType;
+
+    @Nullable
+    private final DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat;
+
+    private final int[] keyProjection;
+
+    private final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat;
+
+    private final int[] valueProjection;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes. Will be updated after the applyReadableMetadata()
+    // 
--------------------------------------------------------------------------------------------
+    private DataType producedDataType;
+
+    private List<String> connectorMetadataKeys;
+
+    private final boolean upsertMode;
+
+    // audit related
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+    private SourceMetricData sourceMetricData;
+
+    public PulsarTableDeserializationSchemaFactory(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
+            int[] keyProjection,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] valueProjection,
+            boolean upsertMode,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
+        this.physicalDataType =
+                checkNotNull(physicalDataType, "field physicalDataType must 
not be null.");
+        this.keyDecodingFormat = keyDecodingFormat;
+        this.keyProjection = checkNotNull(keyProjection);
+        this.valueDecodingFormat =
+                checkNotNull(valueDecodingFormat, "field valueDecodingFormat 
must not be null.");
+        this.valueProjection =
+                checkNotNull(valueProjection, "field valueProjection must not 
be null.");
+
+        this.producedDataType = physicalDataType;
+        this.connectorMetadataKeys = Collections.emptyList();
+        this.upsertMode = upsertMode;
+
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
+    }
+
+    private @Nullable DeserializationSchema<RowData> createDeserialization(
+            DynamicTableSource.Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+
+        DataType physicalFormatDataType = 
Projection.of(projection).project(this.physicalDataType);
+        if (prefix != null) {
+            physicalFormatDataType = 
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    public PulsarDeserializationSchema<RowData> createPulsarDeserialization(
+            ScanTableSource.ScanContext context) {
+        final DeserializationSchema<RowData> keyDeserialization =
+                createDeserialization(context, keyDecodingFormat, 
keyProjection, "");
+        final DeserializationSchema<RowData> valueDeserialization =
+                createDeserialization(context, valueDecodingFormat, 
valueProjection, "");
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        final PulsarReadableMetadata readableMetadata =
+                new PulsarReadableMetadata(connectorMetadataKeys);
+
+        // Get Physical Fields (key + value) + Format Metadata arity
+        final int physicalPlusFormatMetadataArity =
+                DataType.getFieldDataTypes(producedDataType).size()
+                        - readableMetadata.getConnectorMetadataArity();
+        final int[] physicalValuePlusFormatMetadataProjection =
+                
adjustValueProjectionByAppendConnectorMetadata(physicalPlusFormatMetadataArity);
+
+        final PulsarRowDataConverter rowDataConverter =
+                new PulsarRowDataConverter(
+                        physicalPlusFormatMetadataArity,
+                        keyProjection,
+                        physicalValuePlusFormatMetadataProjection,
+                        readableMetadata,
+                        upsertMode);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
+        return new PulsarTableDeserializationSchema(
+                keyDeserialization,
+                valueDeserialization,
+                producedTypeInfo,
+                rowDataConverter,
+                upsertMode,
+                metricOption);
+    }
+
+    public void setProducedDataType(DataType producedDataType) {
+        this.producedDataType = producedDataType;
+    }
+
+    public void setConnectorMetadataKeys(List<String> metadataKeys) {
+        this.connectorMetadataKeys = metadataKeys;
+    }
+
+    private int[] adjustValueProjectionByAppendConnectorMetadata(
+            int physicalValuePlusFormatMetadataArity) {
+        // Concat the Physical Fields (value only) with Format metadata 
projection.
+        final int[] physicalValuePlusFormatMetadataProjection =
+                IntStream.concat(
+                        IntStream.of(valueProjection),
+                        IntStream.range(
+                                keyProjection.length + valueProjection.length,
+                                physicalValuePlusFormatMetadataArity))
+                        .toArray();
+        return physicalValuePlusFormatMetadataProjection;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PulsarTableDeserializationSchemaFactory that = 
(PulsarTableDeserializationSchemaFactory) o;
+        return Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(connectorMetadataKeys, 
that.connectorMetadataKeys)
+                && Objects.equals(upsertMode, that.upsertMode);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        producedDataType,
+                        connectorMetadataKeys,
+                        upsertMode);
+        result = 31 * result + Arrays.hashCode(keyProjection);
+        result = 31 * result + Arrays.hashCode(valueProjection);
+        return result;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
new file mode 100644
index 0000000000..281e3eb5e9
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ScanTableSource} implementation for Pulsar SQL Connector. It uses 
a {@link
+ * SourceProvider} so it doesn't need to support {@link
+ * 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown} 
interface.
+ *
+ * <p>{@link org.apache.flink.connector.pulsar.table.source.PulsarTableSource}
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.source.PulsarTableSource}
+ */
+public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetadata {
+    // 
--------------------------------------------------------------------------------------------
+    // Format attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String FORMAT_METADATA_PREFIX = "value.";
+
+    private final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory;
+
+    /**
+     * Usually it is the same as the valueDecodingFormat, but use a different 
naming to show that it
+     * is used to list all the format metadata keys.
+     */
+    private final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForReadingMetadata;
+
+    private final ChangelogMode changelogMode;
+
+    // 
--------------------------------------------------------------------------------------------
+    // PulsarSource needed attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private final List<String> topics;
+
+    private final Properties properties;
+
+    private final StartCursor startCursor;
+
+    private final StopCursor stopCursor;
+
+    private final SubscriptionType subscriptionType;
+
+    public PulsarTableSource(
+            PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory,
+            DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForReadingMetadata,
+            ChangelogMode changelogMode,
+            List<String> topics,
+            Properties properties,
+            StartCursor startCursor,
+            StopCursor stopCursor,
+            SubscriptionType subscriptionType) {
+        // Format attributes
+        this.deserializationSchemaFactory = 
checkNotNull(deserializationSchemaFactory);
+        this.decodingFormatForReadingMetadata = 
checkNotNull(decodingFormatForReadingMetadata);
+        this.changelogMode = changelogMode;
+        // DataStream connector attributes
+        this.topics = topics;
+        this.properties = checkNotNull(properties);
+        this.startCursor = checkNotNull(startCursor);
+        this.stopCursor = checkNotNull(stopCursor);
+        this.subscriptionType = checkNotNull(subscriptionType);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return changelogMode;
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+        PulsarDeserializationSchema<RowData> deserializationSchema =
+                
deserializationSchemaFactory.createPulsarDeserialization(context);
+        PulsarSource<RowData> source =
+                PulsarSource.builder()
+                        .setTopics(topics)
+                        .setStartCursor(startCursor)
+                        .setUnboundedStopCursor(stopCursor)
+                        .setDeserializationSchema(deserializationSchema)
+                        .setProperties(properties)
+                        .build();
+        return SourceProvider.of(source);
+    }
+
+    /**
+     * According to convention, the order of the final row must be PHYSICAL + 
FORMAT METADATA +
+     * CONNECTOR METADATA where the format metadata has the highest precedence.
+     *
+     * @return
+     */
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> allMetadataMap = new LinkedHashMap<>();
+
+        // add value format metadata with prefix
+        decodingFormatForReadingMetadata
+                .listReadableMetadata()
+                .forEach((key, value) -> 
allMetadataMap.put(FORMAT_METADATA_PREFIX + key, value));
+        // add connector metadata
+        Stream.of(PulsarReadableMetadata.ReadableMetadata.values())
+                .forEachOrdered(m -> allMetadataMap.putIfAbsent(m.key, 
m.dataType));
+
+        return allMetadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> allMetadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                allMetadataKeys.stream()
+                        .filter(k -> k.startsWith(FORMAT_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(allMetadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata =
+                decodingFormatForReadingMetadata.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> 
k.substring(FORMAT_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            
decodingFormatForReadingMetadata.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+
+        // update the factory attributes.
+        
deserializationSchemaFactory.setConnectorMetadataKeys(connectorMetadataKeys);
+        deserializationSchemaFactory.setProducedDataType(producedDataType);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Pulsar table source";
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new PulsarTableSource(
+                deserializationSchemaFactory,
+                decodingFormatForReadingMetadata,
+                changelogMode,
+                topics,
+                properties,
+                startCursor,
+                stopCursor,
+                subscriptionType);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        ConcurrentHashMap map = new ConcurrentHashMap<>();
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PulsarTableSource that = (PulsarTableSource) o;
+        return Objects.equals(deserializationSchemaFactory, 
that.deserializationSchemaFactory)
+                && Objects.equals(
+                        decodingFormatForReadingMetadata, 
that.decodingFormatForReadingMetadata)
+                && Objects.equals(changelogMode, that.changelogMode)
+                && Objects.equals(topics, that.topics)
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(startCursor, that.startCursor)
+                && Objects.equals(stopCursor, that.stopCursor)
+                && subscriptionType == that.subscriptionType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                deserializationSchemaFactory,
+                decodingFormatForReadingMetadata,
+                changelogMode,
+                topics,
+                properties,
+                startCursor,
+                stopCursor,
+                subscriptionType);
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index e0443b0fd6..5a13bc9fe5 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -835,6 +835,9 @@
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
   Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
   License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 


Reply via email to