This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e78b239ab7 [INLONG-10682][Sort] Make pulsar source support to send audit information exactly once and add ExtractNode.INLONG_MSG to helper validate (#10686) e78b239ab7 is described below commit e78b239ab793641b3ebf67e851020b291b0aece9 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Mon Jul 22 17:38:32 2024 +0800 [INLONG-10682][Sort] Make pulsar source support to send audit information exactly once and add ExtractNode.INLONG_MSG to helper validate (#10686) --- .../inlong/sort/pulsar/source/PulsarSource.java | 187 ++++++ .../sort/pulsar/source/PulsarSourceBuilder.java | 647 +++++++++++++++++++++ .../pulsar/source/reader/PulsarSourceReader.java | 314 ++++++++++ .../sort/pulsar/table/PulsarTableFactory.java | 4 +- .../source/PulsarTableDeserializationSchema.java | 26 +- .../pulsar/table/source/PulsarTableSource.java | 5 +- licenses/inlong-sort-connectors/LICENSE | 3 + 7 files changed, 1179 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java new file mode 100644 index 0000000000..e26ee645aa --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.source; + +import org.apache.inlong.sort.pulsar.source.reader.PulsarSourceReader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a + * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records + * of <code>String</code> type. + * + * <pre>{@code + * PulsarSource<String> source = PulsarSource + * .builder() + * .setTopics(TOPIC1, TOPIC2) + * .setServiceUrl(getServiceUrl()) + * .setSubscriptionName("test") + * .setDeserializationSchema(new SimpleStringSchema()) + * .setBounded(StopCursor::defaultStopCursor) + * .build(); + * }</pre> + * + * <p>See {@link PulsarSourceBuilder} for more details. + * + * @param <OUT> The output type of the source. + * Modify from {@link org.apache.flink.connector.pulsar.source.PulsarSource} + */ +@PublicEvolving +public final class PulsarSource<OUT> + implements + Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>, + ResultTypeQueryable<OUT> { + + private static final long serialVersionUID = 7773108631275567433L; + + /** + * The configuration for pulsar source, we don't support the pulsar's configuration class + * directly. + */ + private final SourceConfiguration sourceConfiguration; + + private final PulsarSubscriber subscriber; + + private final RangeGenerator rangeGenerator; + + private final StartCursor startCursor; + + private final StopCursor stopCursor; + + private final Boundedness boundedness; + + /** The pulsar deserialization schema is used for deserializing message. */ + private final PulsarDeserializationSchema<OUT> deserializationSchema; + + private final PulsarCrypto pulsarCrypto; + + /** + * The constructor for PulsarSource, it's package protected for forcing using {@link + * PulsarSourceBuilder}. + */ + PulsarSource( + SourceConfiguration sourceConfiguration, + PulsarSubscriber subscriber, + RangeGenerator rangeGenerator, + StartCursor startCursor, + StopCursor stopCursor, + Boundedness boundedness, + PulsarDeserializationSchema<OUT> deserializationSchema, + PulsarCrypto pulsarCrypto) { + this.sourceConfiguration = sourceConfiguration; + this.subscriber = subscriber; + this.rangeGenerator = rangeGenerator; + this.startCursor = startCursor; + this.stopCursor = stopCursor; + this.boundedness = boundedness; + this.deserializationSchema = deserializationSchema; + this.pulsarCrypto = pulsarCrypto; + } + + /** + * Get a PulsarSourceBuilder to builder a {@link PulsarSource}. + * + * @return a Pulsar source builder. + */ + public static <OUT> PulsarSourceBuilder<OUT> builder() { + return new PulsarSourceBuilder<>(); + } + + @Override + public Boundedness getBoundedness() { + return boundedness; + } + + @Internal + @Override + public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return PulsarSourceReader.create( + sourceConfiguration, deserializationSchema, pulsarCrypto, readerContext); + } + + @Internal + @Override + public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator( + SplitEnumeratorContext<PulsarPartitionSplit> enumContext) throws PulsarClientException { + return new PulsarSourceEnumerator( + subscriber, + startCursor, + stopCursor, + rangeGenerator, + sourceConfiguration, + enumContext); + } + + @Internal + @Override + public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator( + SplitEnumeratorContext<PulsarPartitionSplit> enumContext, + PulsarSourceEnumState checkpoint) + throws PulsarClientException { + return new PulsarSourceEnumerator( + subscriber, + startCursor, + stopCursor, + rangeGenerator, + sourceConfiguration, + enumContext, + checkpoint); + } + + @Internal + @Override + public SimpleVersionedSerializer<PulsarPartitionSplit> getSplitSerializer() { + return PulsarPartitionSplitSerializer.INSTANCE; + } + + @Internal + @Override + public SimpleVersionedSerializer<PulsarSourceEnumState> getEnumeratorCheckpointSerializer() { + return PulsarSourceEnumStateSerializer.INSTANCE; + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java new file mode 100644 index 0000000000..cfabe671a4 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializer; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaWrapper; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeInformationWrapper; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR; +import static org.apache.flink.util.InstantiationUtil.isSerializable; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<String> source = PulsarSource + * .builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(new SimpleStringSchema()) + * .build(); + * }</pre> + * + * <p>The service url, subscription name, topics to consume, and the record deserializer are + * required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursor(StartCursor)}. + * + * <p>By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and + * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below. For + * example the following PulsarSource stops after it consumes up to a event time when the Flink + * started. + * + * <p>To stop the connector user has to disable the auto partition discovery. As auto partition + * discovery always expected new splits to come and not exiting. To disable auto partition + * discovery, use builder.setConfig({@link + * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). + * + * <pre>{@code + * PulsarSource<String> source = PulsarSource + * .builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(new SimpleStringSchema()) + * .setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <OUT> The output type of the source. + * Modify from {@link org.apache.flink.connector.pulsar.source.PulsarSourceBuilder} + */ +@PublicEvolving +public final class PulsarSourceBuilder<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final PulsarConfigBuilder configBuilder; + + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private StartCursor startCursor; + private StopCursor stopCursor; + private Boundedness boundedness; + private PulsarDeserializationSchema<OUT> deserializationSchema; + private PulsarCrypto pulsarCrypto; + + // private builder constructor. + PulsarSourceBuilder() { + this.configBuilder = new PulsarConfigBuilder(); + this.startCursor = StartCursor.defaultStartCursor(); + this.stopCursor = StopCursor.defaultStopCursor(); + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + * @deprecated this method will return builder directly + */ + @Deprecated + public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) { + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) { + return setConfig(PULSAR_SERVICE_URL, serviceUrl); + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setSubscriptionName(String subscriptionName) { + return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + } + + /** + * Set a pulsar topic list for the flink source. Some topics may not exist currently, consuming + * this non-existed topic wouldn't throw any exception. But the best solution is just consuming + * by using a topic regex. You can set topics once either with {@link #setTopics} or {@link + * #setTopicPattern} in this builder. + * + * @param topics The topic list you would like to consume messages. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a pulsar topic list for the flink source. Some topics may not exist currently, consuming + * this non-existed topic wouldn't throw any exception. But the best solution is just consuming + * by using a topic regex. You can set topics once either with {@link #setTopics} or {@link + * #setTopicPattern} in this builder. + * + * @param topics The topic list you would like to consume messages. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopics(List<String> topics) { + ensureSubscriberIsNull("topics"); + List<String> distinctTopics = TopicNameUtils.distinctTopics(topics); + this.subscriber = PulsarSubscriber.getTopicListSubscriber(distinctTopics); + return this; + } + + /** + * Set a topic pattern to consume from the java regex str. You can set topics once either with + * {@link #setTopics} or {@link #setTopicPattern} in this builder. + * + * <p>Remember that we will only subscribe to one tenant and one namespace by using regular + * expression. If you didn't provide the tenant and namespace in the given topic pattern. We + * will use default one instead. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern) { + return setTopicPattern(Pattern.compile(topicsPattern)); + } + + /** + * Set a topic pattern to consume from the java {@link Pattern}. You can set topics once either + * with {@link #setTopics} or {@link #setTopicPattern} in this builder. + * + * <p>Remember that we will only subscribe to one tenant and one namespace by using regular + * expression. If you didn't provide the tenant and namespace in the given topic pattern. We + * will use default one instead. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern) { + return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics); + } + + /** + * Set a topic pattern to consume from the java regex str. You can set topics once either with + * {@link #setTopics} or {@link #setTopicPattern} in this builder. + * + * <p>Remember that we will only subscribe to one tenant and one namespace by using regular + * expression. If you didn't provide the tenant and namespace in the given topic pattern. We + * will use default one instead. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode The topic filter for regex subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopicPattern( + String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + return setTopicPattern(Pattern.compile(topicsPattern), regexSubscriptionMode); + } + + /** + * Set a topic pattern to consume from the java {@link Pattern}. You can set topics once either + * with {@link #setTopics} or {@link #setTopicPattern} in this builder. + * + * <p>Remember that we will only subscribe to one tenant and one namespace by using regular + * expression. If you didn't provide the tenant and namespace in the given topic pattern. We + * will use default one instead. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode When subscribing to a topic using a regular expression, you can + * pick a certain type of topic. + * <ul> + * <li>PersistentOnly: only subscribe to persistent topics. + * <li>NonPersistentOnly: only subscribe to non-persistent topics. + * <li>AllTopics: subscribe to both persistent and non-persistent topics. + * </ul> + * + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setTopicPattern( + Pattern topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + ensureSubscriberIsNull("topic pattern"); + this.subscriber = + PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, regexSubscriptionMode); + return this; + } + + /** + * The consumer name is informative, and it can be used to identify a particular consumer + * instance from the topic stats. + */ + public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) { + return setConfig(PULSAR_CONSUMER_NAME, consumerName); + } + + /** + * If you enable this option, we would consume and deserialize the message by using Pulsar + * {@link Schema}. + */ + public PulsarSourceBuilder<OUT> enableSchemaEvolution() { + configBuilder.set(PULSAR_READ_SCHEMA_EVOLUTION, true); + return this; + } + + /** + * Set a topic range generator for consuming a sub set of keys. + * + * @param rangeGenerator A generator which would generate a set of {@link TopicRange} for given + * topic. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator rangeGenerator) { + this.rangeGenerator = checkNotNull(rangeGenerator); + return this; + } + + /** + * Specify from which offsets the PulsarSource should start consume from by providing an {@link + * StartCursor}. + * + * @param startCursor set the starting offsets for the Source. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) { + this.startCursor = checkNotNull(startCursor); + return this; + } + + /** + * By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and + * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below. + * + * <p>This method is different from {@link #setBoundedStopCursor(StopCursor)} that after setting + * the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will still + * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping + * offsets specified by the stopping offsets {@link StopCursor}. + * + * <p>To stop the connector user has to disable the auto partition discovery. As auto partition + * discovery always expected new splits to come and not exiting. To disable auto partition + * discovery, use builder.setConfig({@link + * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). + * + * @param stopCursor The {@link StopCursor} to specify the stopping offset. + * @return this PulsarSourceBuilder. + * @see #setBoundedStopCursor(StopCursor) + */ + public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor) { + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + this.stopCursor = checkNotNull(stopCursor); + return this; + } + + /** + * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in + * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor} + * to specify the stopping offsets for each partition. When all the partitions have reached + * their stopping offsets, the PulsarSource will then exit. + * + * <p>This method is different from {@link #setUnboundedStopCursor(StopCursor)} that after + * setting the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will + * return {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}. + * + * @param stopCursor the {@link StopCursor} to specify the stopping offsets. + * @return this PulsarSourceBuilder. + * @see #setUnboundedStopCursor(StopCursor) + */ + public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor) { + this.boundedness = Boundedness.BOUNDED; + this.stopCursor = checkNotNull(stopCursor); + return this; + } + + /** + * Deserialize messages from Pulsar by using the flink's {@link DeserializationSchema}. It would + * consume the pulsar message as a byte array and decode the message by using flink's logic. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + DeserializationSchema<T> deserializationSchema) { + return setDeserializationSchema( + new PulsarDeserializationSchemaWrapper<>(deserializationSchema)); + } + + /** + * Deserialize the messages from Pulsar by using {@link Schema#AUTO_CONSUME()}. It will turn the + * pulsar message into a {@link GenericRecord} first. Using this method can consume the messages + * with multiple schemas in the same topic. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + GenericRecordDeserializer<T> deserializer) { + return setDeserializationSchema(new GenericRecordDeserializationSchema<>(deserializer)); + } + + /** + * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would + * consume the pulsar message as a byte array and decode the message by using flink's logic. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive + * types</a> here. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(Schema<T> schema) { + ensureSchemaTypeIsValid(schema); + return setDeserializationSchema(new PulsarSchemaWrapper<>(schema)); + } + + /** + * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would + * consume the pulsar message as a byte array and decode the message by using flink's logic. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + Schema<T> schema, Class<T> typeClass) { + ensureSchemaTypeIsValid(schema); + return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, typeClass)); + } + + /** + * Deserialize messages from Pulsar by using the Pulsar {@link Schema} instance. It would + * consume the pulsar message as a byte array and decode the message by using flink's logic. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here. + */ + public <K, V, T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) { + ensureSchemaTypeIsValid(schema); + return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, keyClass, valueClass)); + } + + /** + * Deserialize messages from Pulsar by using the flink's {@link TypeInformation}. This method is + * only used for treating messages that was written into pulsar by {@link TypeInformation}. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + TypeInformation<T> information, ExecutionConfig config) { + return setDeserializationSchema(new PulsarTypeInformationWrapper<>(information, config)); + } + + /** + * PulsarDeserializationSchema is required for deserializing messages from Pulsar and getting + * the {@link TypeInformation} for message serialization in flink. + */ + public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema( + PulsarDeserializationSchema<T> deserializationSchema) { + PulsarSourceBuilder<T> self = specialized(); + self.deserializationSchema = deserializationSchema; + return self; + } + + /** + * Sets a {@link PulsarCrypto}. Configure the key reader and keys to be used to encrypt the + * message payloads. + * + * @param pulsarCrypto PulsarCrypto object. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setPulsarCrypto( + PulsarCrypto pulsarCrypto, ConsumerCryptoFailureAction action) { + this.pulsarCrypto = checkNotNull(pulsarCrypto); + configBuilder.set(PULSAR_CRYPTO_FAILURE_ACTION, action); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParamsString string which represents parameters for the Authentication-Plugin, + * e.g., "key1:val1,key2:val2" + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setAuthentication( + String authPluginClassName, String authParamsString) { + checkArgument( + !configBuilder.contains(PULSAR_AUTH_PARAM_MAP), + "Duplicated authentication setting."); + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParams map which represents parameters for the Authentication-Plugin + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setAuthentication( + String authPluginClassName, Map<String, String> authParams) { + checkArgument( + !configBuilder.contains(PULSAR_AUTH_PARAMS), "Duplicated authentication setting."); + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams); + return this; + } + + /** + * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The valid keys can be + * found in {@link PulsarSourceOptions} and {@link PulsarOptions}. + * + * <p>Make sure the option could be set only once or with same value. + * + * @param key the key of the property. + * @param value the value of the property. + * @return this PulsarSourceBuilder. + */ + public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T value) { + configBuilder.set(key, value); + return this; + } + + /** + * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The valid keys can be + * found in {@link PulsarSourceOptions} and {@link PulsarOptions}. + * + * @param config the config to set for the PulsarSource. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setConfig(Configuration config) { + configBuilder.set(config); + return this; + } + + /** + * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The valid keys can be + * found in {@link PulsarSourceOptions} and {@link PulsarOptions}. + * + * <p>This method is mainly used for future flink SQL binding. + * + * @param properties the config properties to set for the PulsarSource. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setProperties(Properties properties) { + configBuilder.set(properties); + return this; + } + + /** + * Build the {@link PulsarSource}. + * + * @return a PulsarSource with the settings made for this builder. + */ + @SuppressWarnings("java:S3776") + public PulsarSource<OUT> build() { + // Ensure the topic subscriber for pulsar. + checkNotNull(subscriber, "No topic names or topic pattern are provided."); + + if (rangeGenerator == null) { + LOG.warn( + "No range generator provided, we would use the FullRangeGenerator as the default range generator."); + this.rangeGenerator = new FullRangeGenerator(); + } + + if (boundedness == null) { + LOG.warn("No boundedness was set, mark it as a endless stream."); + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + } + if (boundedness == Boundedness.BOUNDED + && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) >= 0) { + LOG.warn( + "{} property is overridden to -1 because the source is bounded.", + PULSAR_PARTITION_DISCOVERY_INTERVAL_MS); + configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L); + } + + checkNotNull(deserializationSchema, "deserializationSchema should be set."); + // Schema evolution validation. + if (Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) { + checkState( + deserializationSchema instanceof PulsarSchemaWrapper, + "When enabling schema evolution, you must provide a Pulsar Schema in builder's setDeserializationSchema method."); + } else if (deserializationSchema instanceof PulsarSchemaWrapper) { + LOG.info( + "It seems like you are consuming messages by using Pulsar Schema." + + " You can builder.enableSchemaEvolution() to enable schema evolution for better Pulsar Schema check." + + " We would use bypass Schema check by default."); + } + + if (pulsarCrypto == null) { + this.pulsarCrypto = PulsarCrypto.disabled(); + } + + if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) { + LOG.warn( + "We recommend set a readable consumer name through setConsumerName(String) in production mode."); + } else { + String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME); + if (!consumerName.contains("%s")) { + configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s"); + } + } + + // Make sure they are serializable. + checkState( + isSerializable(deserializationSchema), + "PulsarDeserializationSchema isn't serializable"); + checkState(isSerializable(startCursor), "StartCursor isn't serializable"); + checkState(isSerializable(stopCursor), "StopCursor isn't serializable"); + checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable"); + checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable"); + + // Check builder configuration. + SourceConfiguration sourceConfiguration = + configBuilder.build(SOURCE_CONFIG_VALIDATOR, SourceConfiguration::new); + + return new PulsarSource<>( + sourceConfiguration, + subscriber, + rangeGenerator, + startCursor, + stopCursor, + boundedness, + deserializationSchema, + pulsarCrypto); + } + + // ------------- private helpers -------------- + + /** Helper method for java compiler recognizes the generic type. */ + @SuppressWarnings("unchecked") + private <T extends OUT> PulsarSourceBuilder<T> specialized() { + return (PulsarSourceBuilder<T>) this; + } + + /** Topic name and topic pattern are conflict, make sure they are set only once. */ + private void ensureSubscriberIsNull(String attemptingSubscribeMode) { + if (subscriber != null) { + throw new IllegalStateException( + String.format( + "Cannot use %s for consumption because a %s is already set for consumption", + attemptingSubscribeMode, subscriber.getClass().getSimpleName())); + } + } + + private void ensureSchemaTypeIsValid(Schema<?> schema) { + SchemaInfo info = schema.getSchemaInfo(); + if (info.getType() == SchemaType.AUTO_CONSUME) { + throw new IllegalArgumentException( + "Auto schema is only supported by providing a GenericRecordDeserializer"); + } + if (info.getType() == SchemaType.AUTO_PUBLISH) { + throw new IllegalStateException( + "Auto produce schema is not supported in consuming messages"); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java new file mode 100644 index 0000000000..10139a000e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.source.reader; + +import org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderBase; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.common.schema.BytesSchema; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.reader.PulsarRecordEmitter; +import org.apache.flink.connector.pulsar.source.reader.PulsarSourceFetcherManager; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient; + +/** + * The source reader for pulsar subscription Failover and Exclusive, which consumes the ordered + * messages. + * + * @param <OUT> The output message type for flink. + * Modify from {@link org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader} + */ +@Internal +public class PulsarSourceReader<OUT> + extends + SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class); + + private final SourceConfiguration sourceConfiguration; + private final PulsarClient pulsarClient; + @VisibleForTesting + final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit; + private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits; + private final AtomicReference<Throwable> cursorCommitThrowable; + private final PulsarDeserializationSchema<OUT> deserializationSchema; + private ScheduledExecutorService cursorScheduler; + + private PulsarSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, + PulsarSourceFetcherManager fetcherManager, + PulsarDeserializationSchema<OUT> deserializationSchema, + SourceConfiguration sourceConfiguration, + PulsarClient pulsarClient, + SourceReaderContext context) { + super( + elementsQueue, + fetcherManager, + new PulsarRecordEmitter<>(deserializationSchema), + sourceConfiguration, + context); + + this.deserializationSchema = deserializationSchema; + this.sourceConfiguration = sourceConfiguration; + this.pulsarClient = pulsarClient; + + this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); + this.cursorsOfFinishedSplits = new ConcurrentHashMap<>(); + this.cursorCommitThrowable = new AtomicReference<>(); + } + + @Override + public void start() { + super.start(); + if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) { + this.cursorScheduler = Executors.newSingleThreadScheduledExecutor(); + + // Auto commit cursor, this could be enabled when checkpoint is also enabled. + cursorScheduler.scheduleAtFixedRate( + this::cumulativeAcknowledgmentMessage, + sourceConfiguration.getMaxFetchTime().toMillis(), + sourceConfiguration.getAutoCommitCursorInterval(), + TimeUnit.MILLISECONDS); + } + } + + @Override + public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception { + Throwable cause = cursorCommitThrowable.get(); + if (cause != null) { + throw new FlinkRuntimeException("An error occurred in acknowledge message.", cause); + } + + return super.pollNext(output); + } + + @Override + protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) { + // Close all the finished splits. + for (String splitId : finishedSplitIds.keySet()) { + ((PulsarSourceFetcherManager) splitFetcherManager).closeFetcher(splitId); + } + + // We don't require new splits, all the splits are pre-assigned by source enumerator. + if (LOG.isDebugEnabled()) { + LOG.debug("onSplitFinished event: {}", finishedSplitIds); + } + + for (Map.Entry<String, PulsarPartitionSplitState> entry : finishedSplitIds.entrySet()) { + PulsarPartitionSplitState state = entry.getValue(); + MessageId latestConsumedId = state.getLatestConsumedId(); + if (latestConsumedId != null) { + cursorsOfFinishedSplits.put(state.getPartition(), latestConsumedId); + } + } + } + + @Override + protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) { + return new PulsarPartitionSplitState(split); + } + + @Override + protected PulsarPartitionSplit toSplitType( + String splitId, PulsarPartitionSplitState splitState) { + return splitState.toPulsarPartitionSplit(); + } + + @Override + public void pauseOrResumeSplits( + Collection<String> splitsToPause, Collection<String> splitsToResume) { + splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); + } + + @Override + public List<PulsarPartitionSplit> snapshotState(long checkpointId) { + List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId); + + // Perform a snapshot for these splits. + Map<TopicPartition, MessageId> cursors = + cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); + // Put the cursors of the active splits. + for (PulsarPartitionSplit split : splits) { + MessageId latestConsumedId = split.getLatestConsumedId(); + if (latestConsumedId != null) { + cursors.put(split.getPartition(), latestConsumedId); + } + } + // Put cursors of all the finished splits. + cursors.putAll(cursorsOfFinishedSplits); + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); + } + return splits; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + LOG.debug("Committing cursors for checkpoint {}", checkpointId); + Map<TopicPartition, MessageId> cursors = cursorsToCommit.get(checkpointId); + try { + ((PulsarSourceFetcherManager) splitFetcherManager).acknowledgeMessages(cursors); + LOG.debug("Successfully acknowledge cursors for checkpoint {}", checkpointId); + + // Clean up the cursors. + cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet()); + cursorsToCommit.headMap(checkpointId + 1).clear(); + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + PulsarTableDeserializationSchema schema = (PulsarTableDeserializationSchema) deserializationSchema; + schema.flushAudit(); + schema.updateLastCheckpointId(checkpointId); + } + } catch (Exception e) { + LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e); + cursorCommitThrowable.compareAndSet(null, e); + } + + } + + @Override + public void close() throws Exception { + if (cursorScheduler != null) { + cursorScheduler.shutdown(); + } + + // Close the all the consumers. + super.close(); + + // Close shared pulsar resources. + pulsarClient.shutdown(); + } + + // ----------------- helper methods -------------- + + /** Acknowledge the pulsar topic partition cursor by the last consumed message id. */ + private void cumulativeAcknowledgmentMessage() { + Map<TopicPartition, MessageId> cursors = new HashMap<>(cursorsOfFinishedSplits); + + // We reuse snapshotState for acquiring a consume status snapshot. + // So the checkpoint didn't really happen, so we just pass a fake checkpoint id. + List<PulsarPartitionSplit> splits = super.snapshotState(1L); + for (PulsarPartitionSplit split : splits) { + MessageId latestConsumedId = split.getLatestConsumedId(); + if (latestConsumedId != null) { + cursors.put(split.getPartition(), latestConsumedId); + } + } + + try { + ((PulsarSourceFetcherManager) splitFetcherManager).acknowledgeMessages(cursors); + // Clean up the finish splits. + cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet()); + } catch (Exception e) { + LOG.error("Fail in auto cursor commit.", e); + cursorCommitThrowable.compareAndSet(null, e); + } + } + + /** Factory method for creating PulsarSourceReader. */ + public static <OUT> PulsarSourceReader<OUT> create( + SourceConfiguration sourceConfiguration, + PulsarDeserializationSchema<OUT> deserializationSchema, + PulsarCrypto pulsarCrypto, + SourceReaderContext readerContext) + throws Exception { + + // Create a message queue with the predefined source option. + int queueCapacity = sourceConfiguration.getMessageQueueCapacity(); + FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue = + new FutureCompletingBlockingQueue<>(queueCapacity); + + PulsarClient pulsarClient = createClient(sourceConfiguration); + + // Initialize the deserialization schema before creating the pulsar reader. + PulsarDeserializationSchemaInitializationContext initializationContext = + new PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient); + deserializationSchema.open(initializationContext, sourceConfiguration); + + // Choose the right schema bytes to use. + Schema<byte[]> schema; + if (sourceConfiguration.isEnableSchemaEvolution()) { + // Wrap the schema into a byte array schema with extra schema info check. + PulsarSchema<?> pulsarSchema = + ((PulsarSchemaWrapper<?>) deserializationSchema).pulsarSchema(); + schema = new BytesSchema(pulsarSchema); + } else { + schema = Schema.BYTES; + } + + // Create an ordered split reader supplier. + Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier = + () -> new PulsarPartitionSplitReader( + pulsarClient, + sourceConfiguration, + schema, + pulsarCrypto, + readerContext.metricGroup()); + + PulsarSourceFetcherManager fetcherManager = + new PulsarSourceFetcherManager( + elementsQueue, splitReaderSupplier, readerContext.getConfiguration()); + + return new PulsarSourceReader<>( + elementsQueue, + fetcherManager, + deserializationSchema, + sourceConfiguration, + pulsarClient, + readerContext); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java index deb240c7ed..e528f983e1 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory; import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource; @@ -129,7 +130,8 @@ public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTab PulsarSourceOptions.SOURCE_CONFIG_PREFIX, PulsarSourceOptions.CONSUMER_CONFIG_PREFIX, PulsarSinkOptions.PRODUCER_CONFIG_PREFIX, - PulsarSinkOptions.SINK_CONFIG_PREFIX); + PulsarSinkOptions.SINK_CONFIG_PREFIX, + ExtractNode.INLONG_MSG); validatePrimaryKeyConstraints( context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), helper); diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java index 17466899d7..eeeafa6872 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java @@ -19,7 +19,7 @@ package org.apache.inlong.sort.pulsar.table.source; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -65,7 +65,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc private final boolean upsertMode; - private SourceMetricData sourceMetricData; + private SourceExactlyMetric sourceExactlyMetric; private MetricOption metricOption; @@ -94,7 +94,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc keyDeserialization.open(context); } if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption); + sourceExactlyMetric = new SourceExactlyMetric(metricOption); } valueDeserialization.open(context); } @@ -117,7 +117,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc return; } MetricsCollector<RowData> metricsCollector = - new MetricsCollector<>(collector, sourceMetricData); + new MetricsCollector<>(collector, sourceExactlyMetric); valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); @@ -130,4 +130,22 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc public TypeInformation<RowData> getProducedType() { return producedTypeInfo; } + + public void flushAudit() { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.flushAudit(); + } + } + + public void updateCurrentCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + public void updateLastCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateLastCheckpointId(checkpointId); + } + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java index bf48356d26..0445739212 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java @@ -17,9 +17,10 @@ package org.apache.inlong.sort.pulsar.table.source; +import org.apache.inlong.sort.pulsar.source.PulsarSource; +import org.apache.inlong.sort.pulsar.source.PulsarSourceBuilder; + import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.connector.pulsar.source.PulsarSource; -import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor; diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index c340f78668..8d1fd1ede9 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -856,6 +856,9 @@ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarTableSource.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE