This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 41c3984877 [INLONG-10623][Sort] Fix the Pulsar connector on flink1.18 
not set audit time (#10624)
41c3984877 is described below

commit 41c3984877e43bde8b1d1624ab4130ae883d31a1
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Mon Jul 15 14:15:09 2024 +0800

    [INLONG-10623][Sort] Fix the Pulsar connector on flink1.18 not set audit 
time (#10624)
---
 .../sort-connectors/pulsar/pom.xml                 |   5 +
 .../table/source/PulsarReadableMetadata.java       | 163 +++++++++++++++++++++
 .../table/source/PulsarRowDataConverter.java       | 132 +++++++++++++++++
 .../source/PulsarTableDeserializationSchema.java   |   9 +-
 .../PulsarTableDeserializationSchemaFactory.java   |   2 -
 .../pulsar/table/source/PulsarTableSource.java     |   1 -
 licenses/inlong-sort-connectors/LICENSE            |   2 +
 7 files changed, 307 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
index 6721922c29..79c94ad40f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
@@ -42,6 +42,11 @@
             <artifactId>pulsar-client-all</artifactId>
             <version>${pulsar.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connector-base</artifactId>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
new file mode 100644
index 0000000000..8b4e6a8a79
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.inlong.sort.pulsar.table.source.PulsarReadableMetadata.ReadableMetadata.CONSUME_TIME;
+
+/**
+ * Class for reading metadata fields from a Pulsar message and put in 
corresponding Flink row
+ * fields.
+ *
+ * <p>Contains list of readable metadata and provide util methods for metadata 
manipulation.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata}
+ */
+public class PulsarReadableMetadata implements Serializable {
+
+    private static final long serialVersionUID = -4409932324481235973L;
+
+    private final List<String> connectorMetadataKeys;
+
+    private final List<MetadataConverter> metadataConverters;
+
+    public PulsarReadableMetadata(List<String> connectorMetadataKeys) {
+        this.connectorMetadataKeys = connectorMetadataKeys;
+        this.metadataConverters = initializeMetadataConverters();
+    }
+
+    private List<MetadataConverter> initializeMetadataConverters() {
+        return connectorMetadataKeys.stream()
+                .map(
+                        k -> Stream.of(ReadableMetadata.values())
+                                .filter(rm -> rm.key.equals(k))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                .map(m -> m.converter)
+                .collect(Collectors.toList());
+    }
+
+    public void appendProducedRowWithMetadata(
+            GenericRowData producedRowData, int physicalArity, Message<?> 
message, Collector<RowData> collector) {
+        for (int metadataPos = 0; metadataPos < metadataConverters.size(); 
metadataPos++) {
+            Object metadata = 
metadataConverters.get(metadataPos).read(message);
+            producedRowData.setField(
+                    physicalArity + metadataPos, metadata);
+            if 
(CONSUME_TIME.key.equals(connectorMetadataKeys.get(metadataPos)) &&
+                    collector instanceof MetricsCollector) {
+                ((MetricsCollector<RowData>) collector).resetTimestamp((Long) 
metadata);
+            }
+
+        }
+    }
+
+    public int getConnectorMetadataArity() {
+        return metadataConverters.size();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+    interface MetadataConverter extends Serializable {
+
+        Object read(Message<?> message);
+    }
+
+    /** Lists the metadata that is readable from a Pulsar message. Used in SQL 
source connector. */
+    public enum ReadableMetadata {
+
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                message -> StringData.fromString(message.getTopicName())),
+
+        MESSAGE_SIZE("message_size", DataTypes.INT().notNull(), Message::size),
+
+        PRODUCER_NAME(
+                "producer_name",
+                DataTypes.STRING().notNull(),
+                message -> StringData.fromString(message.getProducerName())),
+
+        MESSAGE_ID(
+                "message_id",
+                DataTypes.BYTES().notNull(),
+                message -> message.getMessageId().toByteArray()),
+
+        SEQUENCE_ID("sequenceId", DataTypes.BIGINT().notNull(), 
Message::getSequenceId),
+
+        PUBLISH_TIME(
+                "publish_time",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                message -> 
TimestampData.fromEpochMillis(message.getPublishTime())),
+
+        EVENT_TIME(
+                "event_time",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                message -> 
TimestampData.fromEpochMillis(message.getEventTime())),
+
+        CONSUME_TIME(
+                ExtractNode.CONSUME_AUDIT_TIME,
+                DataTypes.BIGINT().notNull(),
+                message -> System.currentTimeMillis()),
+
+        PROPERTIES(
+                "properties",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.STRING().nullable())
+                        .notNull(),
+                message -> {
+                    final Map<StringData, StringData> map = new HashMap<>();
+                    for (Map.Entry<String, String> e : 
message.getProperties().entrySet()) {
+                        map.put(
+                                StringData.fromString(e.getKey()),
+                                StringData.fromString(e.getValue()));
+                    }
+                    return new GenericMapData(map);
+                });
+
+        public final String key;
+
+        public final DataType dataType;
+
+        public final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
new file mode 100644
index 0000000000..44abbaf7c4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Contains the projection information needed to map a Pulsar message to 
proper key fields, value
+ * fields and metadata fields.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter}
+ */
+public class PulsarRowDataConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int physicalArity;
+
+    private final int[] keyProjection;
+
+    private final int[] valueProjection;
+
+    private final PulsarReadableMetadata readableMetadata;
+
+    private final boolean upsertMode;
+
+    public PulsarRowDataConverter(
+            int physicalArity,
+            int[] keyProjection,
+            int[] valueProjection,
+            PulsarReadableMetadata readableMetadata,
+            boolean upsertMode) {
+        this.physicalArity = physicalArity;
+        this.keyProjection = keyProjection;
+        this.valueProjection = valueProjection;
+        this.readableMetadata = readableMetadata;
+        this.upsertMode = upsertMode;
+    }
+
+    public void projectToProducedRowAndCollect(
+            Message<?> message,
+            List<RowData> keyRowDataList,
+            List<RowData> valueRowDataList,
+            Collector<RowData> collector) {
+        // no key defined
+        if (hasNoKeyProjection()) {
+            valueRowDataList.forEach(
+                    valueRow -> emitRow(null, (GenericRowData) valueRow, 
collector, message));
+        } else {
+            // otherwise emit a value for each key
+            valueRowDataList.forEach(
+                    valueRow -> keyRowDataList.forEach(
+                            keyRow -> emitRow(
+                                    (GenericRowData) keyRow,
+                                    (GenericRowData) valueRow,
+                                    collector,
+                                    message)));
+        }
+    }
+
+    public void projectToRowWithNullValueRow(
+            Message<?> message, List<RowData> keyRowDataList, 
Collector<RowData> collector) {
+        for (RowData keyRow : keyRowDataList) {
+            emitRow((GenericRowData) keyRow, null, collector, message);
+        }
+    }
+
+    private void emitRow(
+            @Nullable GenericRowData physicalKeyRow,
+            @Nullable GenericRowData physicalValueRow,
+            Collector<RowData> collector,
+            Message<?> message) {
+
+        final RowKind rowKind;
+        if (physicalValueRow == null) {
+            if (upsertMode) {
+                rowKind = RowKind.DELETE;
+            } else {
+                throw new DeserializationException(
+                        "Invalid null value received in non-upsert mode. Could 
not to set row kind for output record."
+                                + "upsert mode is not supported yet.");
+            }
+
+        } else {
+            rowKind = physicalValueRow.getRowKind();
+        }
+
+        final GenericRowData producedRow =
+                new GenericRowData(
+                        rowKind, physicalArity + 
readableMetadata.getConnectorMetadataArity());
+
+        for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+            producedRow.setField(valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+        }
+
+        for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+            assert physicalKeyRow != null;
+            producedRow.setField(keyProjection[keyPos], 
physicalKeyRow.getField(keyPos));
+        }
+
+        readableMetadata.appendProducedRowWithMetadata(producedRow, 
physicalArity, message, collector);
+        collector.collect(producedRow);
+    }
+
+    private boolean hasNoKeyProjection() {
+        return keyProjection.length == 0;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
index 13ec80862f..17466899d7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 import org.apache.pulsar.client.api.Message;
@@ -117,12 +116,14 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, 
collector);
             return;
         }
+        MetricsCollector<RowData> metricsCollector =
+                new MetricsCollector<>(collector, sourceMetricData);
 
-        valueDeserialization.deserialize(message.getData(),
-                new MetricsCollector<>(new ListCollector<>(valueRowData), 
sourceMetricData));
+        valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
 
         rowDataConverter.projectToProducedRowAndCollect(
-                message, keyRowData, valueRowData, collector);
+                message, keyRowData, valueRowData, metricsCollector);
+
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
index 87ce2c7541..6698e1e12b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
@@ -23,8 +23,6 @@ import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
-import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index 8864ca45c0..bf48356d26 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index b7c89b9245..c340f78668 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -851,6 +851,8 @@
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java

Reply via email to