This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 41c3984877 [INLONG-10623][Sort] Fix the Pulsar connector on flink1.18 not set audit time (#10624) 41c3984877 is described below commit 41c3984877e43bde8b1d1624ab4130ae883d31a1 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Mon Jul 15 14:15:09 2024 +0800 [INLONG-10623][Sort] Fix the Pulsar connector on flink1.18 not set audit time (#10624) --- .../sort-connectors/pulsar/pom.xml | 5 + .../table/source/PulsarReadableMetadata.java | 163 +++++++++++++++++++++ .../table/source/PulsarRowDataConverter.java | 132 +++++++++++++++++ .../source/PulsarTableDeserializationSchema.java | 9 +- .../PulsarTableDeserializationSchemaFactory.java | 2 - .../pulsar/table/source/PulsarTableSource.java | 1 - licenses/inlong-sort-connectors/LICENSE | 2 + 7 files changed, 307 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml index 6721922c29..79c94ad40f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml @@ -42,6 +42,11 @@ <artifactId>pulsar-client-all</artifactId> <version>${pulsar.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>sort-connector-base</artifactId> diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java new file mode 100644 index 0000000000..8b4e6a8a79 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.table.source; + +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.protocol.node.ExtractNode; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Collector; +import org.apache.pulsar.client.api.Message; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.inlong.sort.pulsar.table.source.PulsarReadableMetadata.ReadableMetadata.CONSUME_TIME; + +/** + * Class for reading metadata fields from a Pulsar message and put in corresponding Flink row + * fields. + * + * <p>Contains list of readable metadata and provide util methods for metadata manipulation. + * Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata} + */ +public class PulsarReadableMetadata implements Serializable { + + private static final long serialVersionUID = -4409932324481235973L; + + private final List<String> connectorMetadataKeys; + + private final List<MetadataConverter> metadataConverters; + + public PulsarReadableMetadata(List<String> connectorMetadataKeys) { + this.connectorMetadataKeys = connectorMetadataKeys; + this.metadataConverters = initializeMetadataConverters(); + } + + private List<MetadataConverter> initializeMetadataConverters() { + return connectorMetadataKeys.stream() + .map( + k -> Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(m -> m.converter) + .collect(Collectors.toList()); + } + + public void appendProducedRowWithMetadata( + GenericRowData producedRowData, int physicalArity, Message<?> message, Collector<RowData> collector) { + for (int metadataPos = 0; metadataPos < metadataConverters.size(); metadataPos++) { + Object metadata = metadataConverters.get(metadataPos).read(message); + producedRowData.setField( + physicalArity + metadataPos, metadata); + if (CONSUME_TIME.key.equals(connectorMetadataKeys.get(metadataPos)) && + collector instanceof MetricsCollector) { + ((MetricsCollector<RowData>) collector).resetTimestamp((Long) metadata); + } + + } + } + + public int getConnectorMetadataArity() { + return metadataConverters.size(); + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + interface MetadataConverter extends Serializable { + + Object read(Message<?> message); + } + + /** Lists the metadata that is readable from a Pulsar message. Used in SQL source connector. */ + public enum ReadableMetadata { + + TOPIC( + "topic", + DataTypes.STRING().notNull(), + message -> StringData.fromString(message.getTopicName())), + + MESSAGE_SIZE("message_size", DataTypes.INT().notNull(), Message::size), + + PRODUCER_NAME( + "producer_name", + DataTypes.STRING().notNull(), + message -> StringData.fromString(message.getProducerName())), + + MESSAGE_ID( + "message_id", + DataTypes.BYTES().notNull(), + message -> message.getMessageId().toByteArray()), + + SEQUENCE_ID("sequenceId", DataTypes.BIGINT().notNull(), Message::getSequenceId), + + PUBLISH_TIME( + "publish_time", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + message -> TimestampData.fromEpochMillis(message.getPublishTime())), + + EVENT_TIME( + "event_time", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + message -> TimestampData.fromEpochMillis(message.getEventTime())), + + CONSUME_TIME( + ExtractNode.CONSUME_AUDIT_TIME, + DataTypes.BIGINT().notNull(), + message -> System.currentTimeMillis()), + + PROPERTIES( + "properties", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()) + .notNull(), + message -> { + final Map<StringData, StringData> map = new HashMap<>(); + for (Map.Entry<String, String> e : message.getProperties().entrySet()) { + map.put( + StringData.fromString(e.getKey()), + StringData.fromString(e.getValue())); + } + return new GenericMapData(map); + }); + + public final String key; + + public final DataType dataType; + + public final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java new file mode 100644 index 0000000000..44abbaf7c4 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.table.source; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.DeserializationException; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.pulsar.client.api.Message; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.List; + +/** + * Contains the projection information needed to map a Pulsar message to proper key fields, value + * fields and metadata fields. + * Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter} + */ +public class PulsarRowDataConverter implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int physicalArity; + + private final int[] keyProjection; + + private final int[] valueProjection; + + private final PulsarReadableMetadata readableMetadata; + + private final boolean upsertMode; + + public PulsarRowDataConverter( + int physicalArity, + int[] keyProjection, + int[] valueProjection, + PulsarReadableMetadata readableMetadata, + boolean upsertMode) { + this.physicalArity = physicalArity; + this.keyProjection = keyProjection; + this.valueProjection = valueProjection; + this.readableMetadata = readableMetadata; + this.upsertMode = upsertMode; + } + + public void projectToProducedRowAndCollect( + Message<?> message, + List<RowData> keyRowDataList, + List<RowData> valueRowDataList, + Collector<RowData> collector) { + // no key defined + if (hasNoKeyProjection()) { + valueRowDataList.forEach( + valueRow -> emitRow(null, (GenericRowData) valueRow, collector, message)); + } else { + // otherwise emit a value for each key + valueRowDataList.forEach( + valueRow -> keyRowDataList.forEach( + keyRow -> emitRow( + (GenericRowData) keyRow, + (GenericRowData) valueRow, + collector, + message))); + } + } + + public void projectToRowWithNullValueRow( + Message<?> message, List<RowData> keyRowDataList, Collector<RowData> collector) { + for (RowData keyRow : keyRowDataList) { + emitRow((GenericRowData) keyRow, null, collector, message); + } + } + + private void emitRow( + @Nullable GenericRowData physicalKeyRow, + @Nullable GenericRowData physicalValueRow, + Collector<RowData> collector, + Message<?> message) { + + final RowKind rowKind; + if (physicalValueRow == null) { + if (upsertMode) { + rowKind = RowKind.DELETE; + } else { + throw new DeserializationException( + "Invalid null value received in non-upsert mode. Could not to set row kind for output record." + + "upsert mode is not supported yet."); + } + + } else { + rowKind = physicalValueRow.getRowKind(); + } + + final GenericRowData producedRow = + new GenericRowData( + rowKind, physicalArity + readableMetadata.getConnectorMetadataArity()); + + for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { + producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos)); + } + + for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) { + assert physicalKeyRow != null; + producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos)); + } + + readableMetadata.appendProducedRowWithMetadata(producedRow, physicalArity, message, collector); + collector.collect(producedRow); + } + + private boolean hasNoKeyProjection() { + return keyProjection.length == 0; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java index 13ec80862f..17466899d7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; -import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; import org.apache.pulsar.client.api.Message; @@ -117,12 +116,14 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); return; } + MetricsCollector<RowData> metricsCollector = + new MetricsCollector<>(collector, sourceMetricData); - valueDeserialization.deserialize(message.getData(), - new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData)); + valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); rowDataConverter.projectToProducedRowAndCollect( - message, keyRowData, valueRowData, collector); + message, keyRowData, valueRowData, metricsCollector); + } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java index 87ce2c7541..6698e1e12b 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java @@ -23,8 +23,6 @@ import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; -import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata; -import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java index 8864ca45c0..bf48356d26 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; -import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index b7c89b9245..c340f78668 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -851,6 +851,8 @@ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java