This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5674bc24f8 [INLONG-10069][Sort] Support audit metrics for sort-connector-pulsar-1.18 (#10070) 5674bc24f8 is described below commit 5674bc24f846609fd39e78f6f203dfb629b30af0 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Apr 26 18:52:26 2024 +0800 [INLONG-10069][Sort] Support audit metrics for sort-connector-pulsar-1.18 (#10070) --- .../sort/pulsar/table/PulsarTableFactory.java | 15 +- .../sort/pulsar/table/PulsarTableOptionUtils.java | 1 + .../sort/pulsar/table/PulsarTableOptions.java | 1 + .../pulsar/table/PulsarTableValidationUtils.java | 4 +- .../source/PulsarTableDeserializationSchema.java | 132 +++++++++++ .../PulsarTableDeserializationSchemaFactory.java | 241 +++++++++++++++++++++ .../pulsar/table/source/PulsarTableSource.java | 227 +++++++++++++++++++ licenses/inlong-sort-connectors/LICENSE | 3 + 8 files changed, 620 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java index a6e7caa00e..4b982f4691 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory; +import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource; + import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; @@ -31,8 +34,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory; import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink; -import org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory; -import org.apache.flink.connector.pulsar.table.source.PulsarTableSource; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -99,6 +100,7 @@ import static org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils * * <p>The main role of this class is to retrieve config options and validate options from config and * the table schema. It also sets default values if a config option is not present. + * Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableFactory} */ public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @@ -154,6 +156,10 @@ public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTab final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); + String auditKeys = tableOptions.get(AUDIT_KEYS); + final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory = new PulsarTableDeserializationSchemaFactory( physicalDataType, @@ -161,7 +167,10 @@ public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTab keyProjection, valueDecodingFormat, valueProjection, - UPSERT_DISABLED); + UPSERT_DISABLED, + inlongMetric, + auditHostAndPorts, + auditKeys); // Set default values for configuration not exposed to user. final DecodingFormat<DeserializationSchema<RowData>> decodingFormatForMetadataPushdown = diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java index 12ef459280..7ff329a660 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java @@ -80,6 +80,7 @@ import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMA * <li>Create key and value encoding/decoding format. * <li>Create key and value projection. * </ul> + * Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils} */ public class PulsarTableOptionUtils { diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java index ae186dc739..bc78dd2d98 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java @@ -37,6 +37,7 @@ import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX; * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}. + * Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableOptions} */ @PublicEvolving public final class PulsarTableOptions { diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java index 8dac83871a..68ee618fc3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java @@ -48,7 +48,9 @@ import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBS import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC; import static org.apache.pulsar.common.naming.TopicName.isValid; -/** Util class for source and sink validation rules. */ +/** Util class for source and sink validation rules. + * Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils} + * */ public class PulsarTableValidationUtils { private PulsarTableValidationUtils() { diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java new file mode 100644 index 0000000000..13ec80862f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.table.source; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.apache.pulsar.client.api.Message; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A specific {@link PulsarDeserializationSchema} for {@link PulsarTableSource}. + * + * <p>Both Flink's key decoding format and value decoding format are wrapped in this class. It is + * responsible for getting metadata fields from a physical pulsar message body, and the final + * projection mapping from Pulsar message fields to Flink row. + * + * <p>After retrieving key and value bytes and convert them into a list of {@link RowData}, it then + * delegates metadata appending, key and value {@link RowData} combining to a {@link + * PulsarRowDataConverter} instance. + * Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema} + */ +public class PulsarTableDeserializationSchema implements PulsarDeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final TypeInformation<RowData> producedTypeInfo; + + @Nullable + private final DeserializationSchema<RowData> keyDeserialization; + + private final DeserializationSchema<RowData> valueDeserialization; + + private final PulsarRowDataConverter rowDataConverter; + + private final boolean upsertMode; + + private SourceMetricData sourceMetricData; + + private MetricOption metricOption; + + public PulsarTableDeserializationSchema( + @Nullable DeserializationSchema<RowData> keyDeserialization, + DeserializationSchema<RowData> valueDeserialization, + TypeInformation<RowData> producedTypeInfo, + PulsarRowDataConverter rowDataConverter, + boolean upsertMode, + MetricOption metricOption) { + if (upsertMode) { + checkNotNull(keyDeserialization, "upsert mode must specify a key format"); + } + this.keyDeserialization = keyDeserialization; + this.valueDeserialization = checkNotNull(valueDeserialization); + this.rowDataConverter = checkNotNull(rowDataConverter); + this.producedTypeInfo = checkNotNull(producedTypeInfo); + this.upsertMode = upsertMode; + this.metricOption = metricOption; + } + + @Override + public void open(PulsarInitializationContext context, SourceConfiguration configuration) + throws Exception { + if (keyDeserialization != null) { + keyDeserialization.open(context); + } + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } + valueDeserialization.open(context); + } + + @Override + public void deserialize(Message<byte[]> message, Collector<RowData> collector) + throws IOException { + + // Get the key row data + List<RowData> keyRowData = new ArrayList<>(); + if (keyDeserialization != null) { + keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData)); + } + + // Get the value row data + List<RowData> valueRowData = new ArrayList<>(); + + if (upsertMode && message.getData().length == 0) { + rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); + return; + } + + valueDeserialization.deserialize(message.getData(), + new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData)); + + rowDataConverter.projectToProducedRowAndCollect( + message, keyRowData, valueRowData, collector); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return producedTypeInfo; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java new file mode 100644 index 0000000000..87ce2c7541 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.table.source; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata; +import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Contains key, value projection and format information, and use such information to create a + * {@link PulsarTableDeserializationSchema} instance used by runtime {@link + * org.apache.flink.connector.pulsar.source.PulsarSource} instance. + * + * <p>A Flink row fields has a strict order: Physical Fields (Key + value) + Format Metadata Fields + * Connector Metadata Fields. Physical Fields are fields come directly from Pulsar message body; + * Format Metadata Fields are from the extra information from the decoding format. Connector + * metadata fields are the ones most Pulsar messages have, such as publish time, message size and + * producer name. + * + * <p>In general, Physical fields + Format Metadata fields are contained in the RowData decoded + * using valueDecodingFormat. Only Connector Metadata fields needs to be appended to the decoded + * RowData. The tricky part is to put format metadata and connector metadata in the right location. + * This requires an explicit adjustment process. + * + * <p>For example, suppose Physical Fields (Key + value) + Format Metadata Fields + Connector + * Metadata Fields. has arity of 11, key projection is [0, 6], and physical value projection is [1, + * 2, 3, 4, 5], Then after the adjustment, key projection should be [0, 6], physical value + * projection should be [1, 2, 3, 4, 5] and format metadata projection should be [7], connector + * metadata projection should be [8, 9, 10]. + * Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory} + */ +public class PulsarTableDeserializationSchemaFactory implements Serializable { + + private static final long serialVersionUID = 1L; + + private final DataType physicalDataType; + + @Nullable + private final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat; + + private final int[] keyProjection; + + private final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat; + + private final int[] valueProjection; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes. Will be updated after the applyReadableMetadata() + // -------------------------------------------------------------------------------------------- + private DataType producedDataType; + + private List<String> connectorMetadataKeys; + + private final boolean upsertMode; + + // audit related + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + private SourceMetricData sourceMetricData; + + public PulsarTableDeserializationSchemaFactory( + DataType physicalDataType, + @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, + int[] keyProjection, + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, + int[] valueProjection, + boolean upsertMode, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { + this.physicalDataType = + checkNotNull(physicalDataType, "field physicalDataType must not be null."); + this.keyDecodingFormat = keyDecodingFormat; + this.keyProjection = checkNotNull(keyProjection); + this.valueDecodingFormat = + checkNotNull(valueDecodingFormat, "field valueDecodingFormat must not be null."); + this.valueProjection = + checkNotNull(valueProjection, "field valueProjection must not be null."); + + this.producedDataType = physicalDataType; + this.connectorMetadataKeys = Collections.emptyList(); + this.upsertMode = upsertMode; + + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; + } + + private @Nullable DeserializationSchema<RowData> createDeserialization( + DynamicTableSource.Context context, + @Nullable DecodingFormat<DeserializationSchema<RowData>> format, + int[] projection, + @Nullable String prefix) { + if (format == null) { + return null; + } + + DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); + if (prefix != null) { + physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + } + return format.createRuntimeDecoder(context, physicalFormatDataType); + } + + public PulsarDeserializationSchema<RowData> createPulsarDeserialization( + ScanTableSource.ScanContext context) { + final DeserializationSchema<RowData> keyDeserialization = + createDeserialization(context, keyDecodingFormat, keyProjection, ""); + final DeserializationSchema<RowData> valueDeserialization = + createDeserialization(context, valueDecodingFormat, valueProjection, ""); + + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + final PulsarReadableMetadata readableMetadata = + new PulsarReadableMetadata(connectorMetadataKeys); + + // Get Physical Fields (key + value) + Format Metadata arity + final int physicalPlusFormatMetadataArity = + DataType.getFieldDataTypes(producedDataType).size() + - readableMetadata.getConnectorMetadataArity(); + final int[] physicalValuePlusFormatMetadataProjection = + adjustValueProjectionByAppendConnectorMetadata(physicalPlusFormatMetadataArity); + + final PulsarRowDataConverter rowDataConverter = + new PulsarRowDataConverter( + physicalPlusFormatMetadataArity, + keyProjection, + physicalValuePlusFormatMetadataProjection, + readableMetadata, + upsertMode); + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + + return new PulsarTableDeserializationSchema( + keyDeserialization, + valueDeserialization, + producedTypeInfo, + rowDataConverter, + upsertMode, + metricOption); + } + + public void setProducedDataType(DataType producedDataType) { + this.producedDataType = producedDataType; + } + + public void setConnectorMetadataKeys(List<String> metadataKeys) { + this.connectorMetadataKeys = metadataKeys; + } + + private int[] adjustValueProjectionByAppendConnectorMetadata( + int physicalValuePlusFormatMetadataArity) { + // Concat the Physical Fields (value only) with Format metadata projection. + final int[] physicalValuePlusFormatMetadataProjection = + IntStream.concat( + IntStream.of(valueProjection), + IntStream.range( + keyProjection.length + valueProjection.length, + physicalValuePlusFormatMetadataArity)) + .toArray(); + return physicalValuePlusFormatMetadataProjection; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PulsarTableDeserializationSchemaFactory that = (PulsarTableDeserializationSchemaFactory) o; + return Objects.equals(physicalDataType, that.physicalDataType) + && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) + && Arrays.equals(keyProjection, that.keyProjection) + && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) + && Arrays.equals(valueProjection, that.valueProjection) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(connectorMetadataKeys, that.connectorMetadataKeys) + && Objects.equals(upsertMode, that.upsertMode); + } + + @Override + public int hashCode() { + int result = + Objects.hash( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + producedDataType, + connectorMetadataKeys, + upsertMode); + result = 31 * result + Arrays.hashCode(keyProjection); + result = 31 * result + Arrays.hashCode(valueProjection); + return result; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java new file mode 100644 index 0000000000..281e3eb5e9 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.table.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.pulsar.source.PulsarSource; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ScanTableSource} implementation for Pulsar SQL Connector. It uses a {@link + * SourceProvider} so it doesn't need to support {@link + * org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown} interface. + * + * <p>{@link org.apache.flink.connector.pulsar.table.source.PulsarTableSource} + * Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableSource} + */ +public class PulsarTableSource implements ScanTableSource, SupportsReadingMetadata { + // -------------------------------------------------------------------------------------------- + // Format attributes + // -------------------------------------------------------------------------------------------- + + private static final String FORMAT_METADATA_PREFIX = "value."; + + private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory; + + /** + * Usually it is the same as the valueDecodingFormat, but use a different naming to show that it + * is used to list all the format metadata keys. + */ + private final DecodingFormat<DeserializationSchema<RowData>> decodingFormatForReadingMetadata; + + private final ChangelogMode changelogMode; + + // -------------------------------------------------------------------------------------------- + // PulsarSource needed attributes + // -------------------------------------------------------------------------------------------- + + private final List<String> topics; + + private final Properties properties; + + private final StartCursor startCursor; + + private final StopCursor stopCursor; + + private final SubscriptionType subscriptionType; + + public PulsarTableSource( + PulsarTableDeserializationSchemaFactory deserializationSchemaFactory, + DecodingFormat<DeserializationSchema<RowData>> decodingFormatForReadingMetadata, + ChangelogMode changelogMode, + List<String> topics, + Properties properties, + StartCursor startCursor, + StopCursor stopCursor, + SubscriptionType subscriptionType) { + // Format attributes + this.deserializationSchemaFactory = checkNotNull(deserializationSchemaFactory); + this.decodingFormatForReadingMetadata = checkNotNull(decodingFormatForReadingMetadata); + this.changelogMode = changelogMode; + // DataStream connector attributes + this.topics = topics; + this.properties = checkNotNull(properties); + this.startCursor = checkNotNull(startCursor); + this.stopCursor = checkNotNull(stopCursor); + this.subscriptionType = checkNotNull(subscriptionType); + } + + @Override + public ChangelogMode getChangelogMode() { + return changelogMode; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { + PulsarDeserializationSchema<RowData> deserializationSchema = + deserializationSchemaFactory.createPulsarDeserialization(context); + PulsarSource<RowData> source = + PulsarSource.builder() + .setTopics(topics) + .setStartCursor(startCursor) + .setUnboundedStopCursor(stopCursor) + .setDeserializationSchema(deserializationSchema) + .setProperties(properties) + .build(); + return SourceProvider.of(source); + } + + /** + * According to convention, the order of the final row must be PHYSICAL + FORMAT METADATA + + * CONNECTOR METADATA where the format metadata has the highest precedence. + * + * @return + */ + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> allMetadataMap = new LinkedHashMap<>(); + + // add value format metadata with prefix + decodingFormatForReadingMetadata + .listReadableMetadata() + .forEach((key, value) -> allMetadataMap.put(FORMAT_METADATA_PREFIX + key, value)); + // add connector metadata + Stream.of(PulsarReadableMetadata.ReadableMetadata.values()) + .forEachOrdered(m -> allMetadataMap.putIfAbsent(m.key, m.dataType)); + + return allMetadataMap; + } + + @Override + public void applyReadableMetadata(List<String> allMetadataKeys, DataType producedDataType) { + // separate connector and format metadata + final List<String> formatMetadataKeys = + allMetadataKeys.stream() + .filter(k -> k.startsWith(FORMAT_METADATA_PREFIX)) + .collect(Collectors.toList()); + + final List<String> connectorMetadataKeys = new ArrayList<>(allMetadataKeys); + connectorMetadataKeys.removeAll(formatMetadataKeys); + + // push down format metadata + final Map<String, DataType> formatMetadata = + decodingFormatForReadingMetadata.listReadableMetadata(); + if (formatMetadata.size() > 0) { + final List<String> requestedFormatMetadataKeys = + formatMetadataKeys.stream() + .map(k -> k.substring(FORMAT_METADATA_PREFIX.length())) + .collect(Collectors.toList()); + decodingFormatForReadingMetadata.applyReadableMetadata(requestedFormatMetadataKeys); + } + + // update the factory attributes. + deserializationSchemaFactory.setConnectorMetadataKeys(connectorMetadataKeys); + deserializationSchemaFactory.setProducedDataType(producedDataType); + } + + @Override + public String asSummaryString() { + return "Pulsar table source"; + } + + @Override + public DynamicTableSource copy() { + return new PulsarTableSource( + deserializationSchemaFactory, + decodingFormatForReadingMetadata, + changelogMode, + topics, + properties, + startCursor, + stopCursor, + subscriptionType); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + ConcurrentHashMap map = new ConcurrentHashMap<>(); + if (o == null || getClass() != o.getClass()) { + return false; + } + PulsarTableSource that = (PulsarTableSource) o; + return Objects.equals(deserializationSchemaFactory, that.deserializationSchemaFactory) + && Objects.equals( + decodingFormatForReadingMetadata, that.decodingFormatForReadingMetadata) + && Objects.equals(changelogMode, that.changelogMode) + && Objects.equals(topics, that.topics) + && Objects.equals(properties, that.properties) + && Objects.equals(startCursor, that.startCursor) + && Objects.equals(stopCursor, that.stopCursor) + && subscriptionType == that.subscriptionType; + } + + @Override + public int hashCode() { + return Objects.hash( + deserializationSchemaFactory, + decodingFormatForReadingMetadata, + changelogMode, + topics, + properties, + startCursor, + stopCursor, + subscriptionType); + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index e0443b0fd6..5a13bc9fe5 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -835,6 +835,9 @@ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE