This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e78b239ab7 [INLONG-10682][Sort] Make pulsar source support to send 
audit information exactly once and add ExtractNode.INLONG_MSG to helper 
validate (#10686)
e78b239ab7 is described below

commit e78b239ab793641b3ebf67e851020b291b0aece9
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Mon Jul 22 17:38:32 2024 +0800

    [INLONG-10682][Sort] Make pulsar source support to send audit information 
exactly once and add ExtractNode.INLONG_MSG to helper validate (#10686)
---
 .../inlong/sort/pulsar/source/PulsarSource.java    | 187 ++++++
 .../sort/pulsar/source/PulsarSourceBuilder.java    | 647 +++++++++++++++++++++
 .../pulsar/source/reader/PulsarSourceReader.java   | 314 ++++++++++
 .../sort/pulsar/table/PulsarTableFactory.java      |   4 +-
 .../source/PulsarTableDeserializationSchema.java   |  26 +-
 .../pulsar/table/source/PulsarTableSource.java     |   5 +-
 licenses/inlong-sort-connectors/LICENSE            |   3 +
 7 files changed, 1179 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
new file mode 100644
index 0000000000..e26ee645aa
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.inlong.sort.pulsar.source.reader.PulsarSourceReader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link 
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a 
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setTopics(TOPIC1, TOPIC2)
+ *     .setServiceUrl(getServiceUrl())
+ *     .setSubscriptionName("test")
+ *     .setDeserializationSchema(new SimpleStringSchema())
+ *     .setBounded(StopCursor::defaultStopCursor)
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <OUT> The output type of the source.
+ * Modify from  {@link org.apache.flink.connector.pulsar.source.PulsarSource}
+ */
+@PublicEvolving
+public final class PulsarSource<OUT>
+        implements
+            Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>,
+            ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 7773108631275567433L;
+
+    /**
+     * The configuration for pulsar source, we don't support the pulsar's 
configuration class
+     * directly.
+     */
+    private final SourceConfiguration sourceConfiguration;
+
+    private final PulsarSubscriber subscriber;
+
+    private final RangeGenerator rangeGenerator;
+
+    private final StartCursor startCursor;
+
+    private final StopCursor stopCursor;
+
+    private final Boundedness boundedness;
+
+    /** The pulsar deserialization schema is used for deserializing message. */
+    private final PulsarDeserializationSchema<OUT> deserializationSchema;
+
+    private final PulsarCrypto pulsarCrypto;
+
+    /**
+     * The constructor for PulsarSource, it's package protected for forcing 
using {@link
+     * PulsarSourceBuilder}.
+     */
+    PulsarSource(
+            SourceConfiguration sourceConfiguration,
+            PulsarSubscriber subscriber,
+            RangeGenerator rangeGenerator,
+            StartCursor startCursor,
+            StopCursor stopCursor,
+            Boundedness boundedness,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            PulsarCrypto pulsarCrypto) {
+        this.sourceConfiguration = sourceConfiguration;
+        this.subscriber = subscriber;
+        this.rangeGenerator = rangeGenerator;
+        this.startCursor = startCursor;
+        this.stopCursor = stopCursor;
+        this.boundedness = boundedness;
+        this.deserializationSchema = deserializationSchema;
+        this.pulsarCrypto = pulsarCrypto;
+    }
+
+    /**
+     * Get a PulsarSourceBuilder to builder a {@link PulsarSource}.
+     *
+     * @return a Pulsar source builder.
+     */
+    public static <OUT> PulsarSourceBuilder<OUT> builder() {
+        return new PulsarSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness;
+    }
+
+    @Internal
+    @Override
+    public SourceReader<OUT, PulsarPartitionSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return PulsarSourceReader.create(
+                sourceConfiguration, deserializationSchema, pulsarCrypto, 
readerContext);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
createEnumerator(
+            SplitEnumeratorContext<PulsarPartitionSplit> enumContext) throws 
PulsarClientException {
+        return new PulsarSourceEnumerator(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                sourceConfiguration,
+                enumContext);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
restoreEnumerator(
+            SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
+            PulsarSourceEnumState checkpoint)
+            throws PulsarClientException {
+        return new PulsarSourceEnumerator(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                sourceConfiguration,
+                enumContext,
+                checkpoint);
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<PulsarPartitionSplit> 
getSplitSerializer() {
+        return PulsarPartitionSplitSerializer.INSTANCE;
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<PulsarSourceEnumState> 
getEnumeratorCheckpointSerializer() {
+        return PulsarSourceEnumStateSerializer.INSTANCE;
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
new file mode 100644
index 0000000000..cfabe671a4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializer;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaWrapper;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeInformationWrapper;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR;
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     .setDeserializationSchema(new SimpleStringSchema())
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, subscription name, topics to consume, and the record 
deserializer are
+ * required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursor(StartCursor)}.
+ *
+ * <p>By default, the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the 
PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery 
as described below. For
+ * example the following PulsarSource stops after it consumes up to a event 
time when the Flink
+ * started.
+ *
+ * <p>To stop the connector user has to disable the auto partition discovery. 
As auto partition
+ * discovery always expected new splits to come and not exiting. To disable 
auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     .setDeserializationSchema(new SimpleStringSchema())
+ *     
.setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <OUT> The output type of the source.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.source.PulsarSourceBuilder}
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<OUT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final PulsarConfigBuilder configBuilder;
+
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private StartCursor startCursor;
+    private StopCursor stopCursor;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<OUT> deserializationSchema;
+    private PulsarCrypto pulsarCrypto;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        this.configBuilder = new PulsarConfigBuilder();
+        this.startCursor = StartCursor.defaultStartCursor();
+        this.stopCursor = StopCursor.defaultStopCursor();
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     * @deprecated this method will return builder directly
+     */
+    @Deprecated
+    public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) {
+        return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setSubscriptionName(String 
subscriptionName) {
+        return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+    }
+
+    /**
+     * Set a pulsar topic list for the flink source. Some topics may not exist 
currently, consuming
+     * this non-existed topic wouldn't throw any exception. But the best 
solution is just consuming
+     * by using a topic regex. You can set topics once either with {@link 
#setTopics} or {@link
+     * #setTopicPattern} in this builder.
+     *
+     * @param topics The topic list you would like to consume messages.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a pulsar topic list for the flink source. Some topics may not exist 
currently, consuming
+     * this non-existed topic wouldn't throw any exception. But the best 
solution is just consuming
+     * by using a topic regex. You can set topics once either with {@link 
#setTopics} or {@link
+     * #setTopicPattern} in this builder.
+     *
+     * @param topics The topic list you would like to consume messages.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        List<String> distinctTopics = TopicNameUtils.distinctTopics(topics);
+        this.subscriber = 
PulsarSubscriber.getTopicListSubscriber(distinctTopics);
+        return this;
+    }
+
+    /**
+     * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
+     * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular
+     * expression. If you didn't provide the tenant and namespace in the given 
topic pattern. We
+     * will use default one instead.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern) {
+        return setTopicPattern(Pattern.compile(topicsPattern));
+    }
+
+    /**
+     * Set a topic pattern to consume from the java {@link Pattern}. You can 
set topics once either
+     * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular
+     * expression. If you didn't provide the tenant and namespace in the given 
topic pattern. We
+     * will use default one instead.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern) {
+        return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+    }
+
+    /**
+     * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
+     * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular
+     * expression. If you didn't provide the tenant and namespace in the given 
topic pattern. We
+     * will use default one instead.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(
+            String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) 
{
+        return setTopicPattern(Pattern.compile(topicsPattern), 
regexSubscriptionMode);
+    }
+
+    /**
+     * Set a topic pattern to consume from the java {@link Pattern}. You can 
set topics once either
+     * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular
+     * expression. If you didn't provide the tenant and namespace in the given 
topic pattern. We
+     * will use default one instead.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode When subscribing to a topic using a 
regular expression, you can
+     *     pick a certain type of topic.
+     *     <ul>
+     *       <li>PersistentOnly: only subscribe to persistent topics.
+     *       <li>NonPersistentOnly: only subscribe to non-persistent topics.
+     *       <li>AllTopics: subscribe to both persistent and non-persistent 
topics.
+     *     </ul>
+     *
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(
+            Pattern topicsPattern, RegexSubscriptionMode 
regexSubscriptionMode) {
+        ensureSubscriberIsNull("topic pattern");
+        this.subscriber =
+                PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, 
regexSubscriptionMode);
+        return this;
+    }
+
+    /**
+     * The consumer name is informative, and it can be used to identify a 
particular consumer
+     * instance from the topic stats.
+     */
+    public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
+        return setConfig(PULSAR_CONSUMER_NAME, consumerName);
+    }
+
+    /**
+     * If you enable this option, we would consume and deserialize the message 
by using Pulsar
+     * {@link Schema}.
+     */
+    public PulsarSourceBuilder<OUT> enableSchemaEvolution() {
+        configBuilder.set(PULSAR_READ_SCHEMA_EVOLUTION, true);
+        return this;
+    }
+
+    /**
+     * Set a topic range generator for consuming a sub set of keys.
+     *
+     * @param rangeGenerator A generator which would generate a set of {@link 
TopicRange} for given
+     *     topic.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator 
rangeGenerator) {
+        this.rangeGenerator = checkNotNull(rangeGenerator);
+        return this;
+    }
+
+    /**
+     * Specify from which offsets the PulsarSource should start consume from 
by providing an {@link
+     * StartCursor}.
+     *
+     * @param startCursor set the starting offsets for the Source.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
+        this.startCursor = checkNotNull(startCursor);
+        return this;
+    }
+
+    /**
+     * By default, the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+     * never stop until the Flink job is canceled or fails. To let the 
PulsarSource run in {@link
+     * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one 
can call {@link
+     * #setUnboundedStopCursor(StopCursor)} and disable auto partition 
discovery as described below.
+     *
+     * <p>This method is different from {@link 
#setBoundedStopCursor(StopCursor)} that after setting
+     * the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will still
+     * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will 
stop at the stopping
+     * offsets specified by the stopping offsets {@link StopCursor}.
+     *
+     * <p>To stop the connector user has to disable the auto partition 
discovery. As auto partition
+     * discovery always expected new splits to come and not exiting. To 
disable auto partition
+     * discovery, use builder.setConfig({@link
+     * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+     *
+     * @param stopCursor The {@link StopCursor} to specify the stopping offset.
+     * @return this PulsarSourceBuilder.
+     * @see #setBoundedStopCursor(StopCursor)
+     */
+    public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor 
stopCursor) {
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.stopCursor = checkNotNull(stopCursor);
+        return this;
+    }
+
+    /**
+     * By default, the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run in
+     * {@link Boundedness#BOUNDED} manner and stops at some point, one can set 
an {@link StopCursor}
+     * to specify the stopping offsets for each partition. When all the 
partitions have reached
+     * their stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setUnboundedStopCursor(StopCursor)} that after
+     * setting the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will
+     * return {@link Boundedness#BOUNDED} instead of {@link 
Boundedness#CONTINUOUS_UNBOUNDED}.
+     *
+     * @param stopCursor the {@link StopCursor} to specify the stopping 
offsets.
+     * @return this PulsarSourceBuilder.
+     * @see #setUnboundedStopCursor(StopCursor)
+     */
+    public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor 
stopCursor) {
+        this.boundedness = Boundedness.BOUNDED;
+        this.stopCursor = checkNotNull(stopCursor);
+        return this;
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the flink's {@link 
DeserializationSchema}. It would
+     * consume the pulsar message as a byte array and decode the message by 
using flink's logic.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            DeserializationSchema<T> deserializationSchema) {
+        return setDeserializationSchema(
+                new 
PulsarDeserializationSchemaWrapper<>(deserializationSchema));
+    }
+
+    /**
+     * Deserialize the messages from Pulsar by using {@link 
Schema#AUTO_CONSUME()}. It will turn the
+     * pulsar message into a {@link GenericRecord} first. Using this method 
can consume the messages
+     * with multiple schemas in the same topic.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            GenericRecordDeserializer<T> deserializer) {
+        return setDeserializationSchema(new 
GenericRecordDeserializationSchema<>(deserializer));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} 
instance. It would
+     * consume the pulsar message as a byte array and decode the message by 
using flink's logic.
+     *
+     * <p>We only support <a
+     * 
href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type";>primitive
+     * types</a> here.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> 
setDeserializationSchema(Schema<T> schema) {
+        ensureSchemaTypeIsValid(schema);
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} 
instance. It would
+     * consume the pulsar message as a byte array and decode the message by 
using flink's logic.
+     *
+     * <p>We only support <a
+     * 
href="https://pulsar.apache.org/docs/en/schema-understand/#struct";>struct 
types</a> here.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            Schema<T> schema, Class<T> typeClass) {
+        ensureSchemaTypeIsValid(schema);
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, 
typeClass));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the Pulsar {@link Schema} 
instance. It would
+     * consume the pulsar message as a byte array and decode the message by 
using flink's logic.
+     *
+     * <p>We only support <a
+     * 
href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue";>keyvalue 
types</a> here.
+     */
+    public <K, V, T extends OUT> PulsarSourceBuilder<T> 
setDeserializationSchema(
+            Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> 
valueClass) {
+        ensureSchemaTypeIsValid(schema);
+        return setDeserializationSchema(new PulsarSchemaWrapper<>(schema, 
keyClass, valueClass));
+    }
+
+    /**
+     * Deserialize messages from Pulsar by using the flink's {@link 
TypeInformation}. This method is
+     * only used for treating messages that was written into pulsar by {@link 
TypeInformation}.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            TypeInformation<T> information, ExecutionConfig config) {
+        return setDeserializationSchema(new 
PulsarTypeInformationWrapper<>(information, config));
+    }
+
+    /**
+     * PulsarDeserializationSchema is required for deserializing messages from 
Pulsar and getting
+     * the {@link TypeInformation} for message serialization in flink.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            PulsarDeserializationSchema<T> deserializationSchema) {
+        PulsarSourceBuilder<T> self = specialized();
+        self.deserializationSchema = deserializationSchema;
+        return self;
+    }
+
+    /**
+     * Sets a {@link PulsarCrypto}. Configure the key reader and keys to be 
used to encrypt the
+     * message payloads.
+     *
+     * @param pulsarCrypto PulsarCrypto object.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setPulsarCrypto(
+            PulsarCrypto pulsarCrypto, ConsumerCryptoFailureAction action) {
+        this.pulsarCrypto = checkNotNull(pulsarCrypto);
+        configBuilder.set(PULSAR_CRYPTO_FAILURE_ACTION, action);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParamsString string which represents parameters for the 
Authentication-Plugin,
+     *     e.g., "key1:val1,key2:val2"
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setAuthentication(
+            String authPluginClassName, String authParamsString) {
+        checkArgument(
+                !configBuilder.contains(PULSAR_AUTH_PARAM_MAP),
+                "Duplicated authentication setting.");
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParams map which represents parameters for the 
Authentication-Plugin
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setAuthentication(
+            String authPluginClassName, Map<String, String> authParams) {
+        checkArgument(
+                !configBuilder.contains(PULSAR_AUTH_PARAMS), "Duplicated 
authentication setting.");
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
+        return this;
+    }
+
+    /**
+     * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * <p>Make sure the option could be set only once or with same value.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this PulsarSourceBuilder.
+     */
+    public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T 
value) {
+        configBuilder.set(key, value);
+        return this;
+    }
+
+    /**
+     * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * @param config the config to set for the PulsarSource.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setConfig(Configuration config) {
+        configBuilder.set(config);
+        return this;
+    }
+
+    /**
+     * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * <p>This method is mainly used for future flink SQL binding.
+     *
+     * @param properties the config properties to set for the PulsarSource.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setProperties(Properties properties) {
+        configBuilder.set(properties);
+        return this;
+    }
+
+    /**
+     * Build the {@link PulsarSource}.
+     *
+     * @return a PulsarSource with the settings made for this builder.
+     */
+    @SuppressWarnings("java:S3776")
+    public PulsarSource<OUT> build() {
+        // Ensure the topic subscriber for pulsar.
+        checkNotNull(subscriber, "No topic names or topic pattern are 
provided.");
+
+        if (rangeGenerator == null) {
+            LOG.warn(
+                    "No range generator provided, we would use the 
FullRangeGenerator as the default range generator.");
+            this.rangeGenerator = new FullRangeGenerator();
+        }
+
+        if (boundedness == null) {
+            LOG.warn("No boundedness was set, mark it as a endless stream.");
+            this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        }
+        if (boundedness == Boundedness.BOUNDED
+                && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) 
>= 0) {
+            LOG.warn(
+                    "{} property is overridden to -1 because the source is 
bounded.",
+                    PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
+            configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
-1L);
+        }
+
+        checkNotNull(deserializationSchema, "deserializationSchema should be 
set.");
+        // Schema evolution validation.
+        if 
(Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) {
+            checkState(
+                    deserializationSchema instanceof PulsarSchemaWrapper,
+                    "When enabling schema evolution, you must provide a Pulsar 
Schema in builder's setDeserializationSchema method.");
+        } else if (deserializationSchema instanceof PulsarSchemaWrapper) {
+            LOG.info(
+                    "It seems like you are consuming messages by using Pulsar 
Schema."
+                            + " You can builder.enableSchemaEvolution() to 
enable schema evolution for better Pulsar Schema check."
+                            + " We would use bypass Schema check by default.");
+        }
+
+        if (pulsarCrypto == null) {
+            this.pulsarCrypto = PulsarCrypto.disabled();
+        }
+
+        if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
+            LOG.warn(
+                    "We recommend set a readable consumer name through 
setConsumerName(String) in production mode.");
+        } else {
+            String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+            if (!consumerName.contains("%s")) {
+                configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " 
- %s");
+            }
+        }
+
+        // Make sure they are serializable.
+        checkState(
+                isSerializable(deserializationSchema),
+                "PulsarDeserializationSchema isn't serializable");
+        checkState(isSerializable(startCursor), "StartCursor isn't 
serializable");
+        checkState(isSerializable(stopCursor), "StopCursor isn't 
serializable");
+        checkState(isSerializable(rangeGenerator), "RangeGenerator isn't 
serializable");
+        checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't 
serializable");
+
+        // Check builder configuration.
+        SourceConfiguration sourceConfiguration =
+                configBuilder.build(SOURCE_CONFIG_VALIDATOR, 
SourceConfiguration::new);
+
+        return new PulsarSource<>(
+                sourceConfiguration,
+                subscriber,
+                rangeGenerator,
+                startCursor,
+                stopCursor,
+                boundedness,
+                deserializationSchema,
+                pulsarCrypto);
+    }
+
+    // ------------- private helpers --------------
+
+    /** Helper method for java compiler recognizes the generic type. */
+    @SuppressWarnings("unchecked")
+    private <T extends OUT> PulsarSourceBuilder<T> specialized() {
+        return (PulsarSourceBuilder<T>) this;
+    }
+
+    /** Topic name and topic pattern are conflict, make sure they are set only 
once. */
+    private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+        if (subscriber != null) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot use %s for consumption because a %s is 
already set for consumption",
+                            attemptingSubscribeMode, 
subscriber.getClass().getSimpleName()));
+        }
+    }
+
+    private void ensureSchemaTypeIsValid(Schema<?> schema) {
+        SchemaInfo info = schema.getSchemaInfo();
+        if (info.getType() == SchemaType.AUTO_CONSUME) {
+            throw new IllegalArgumentException(
+                    "Auto schema is only supported by providing a 
GenericRecordDeserializer");
+        }
+        if (info.getType() == SchemaType.AUTO_PUBLISH) {
+            throw new IllegalStateException(
+                    "Auto produce schema is not supported in consuming 
messages");
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
new file mode 100644
index 0000000000..10139a000e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source.reader;
+
+import 
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.reader.PulsarRecordEmitter;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceFetcherManager;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+
+/**
+ * The source reader for pulsar subscription Failover and Exclusive, which 
consumes the ordered
+ * messages.
+ *
+ * @param <OUT> The output message type for flink.
+ * Modify from  {@link 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader}
+ */
+@Internal
+public class PulsarSourceReader<OUT>
+        extends
+            SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit, 
PulsarPartitionSplitState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceReader.class);
+
+    private final SourceConfiguration sourceConfiguration;
+    private final PulsarClient pulsarClient;
+    @VisibleForTesting
+    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
+    private final ConcurrentMap<TopicPartition, MessageId> 
cursorsOfFinishedSplits;
+    private final AtomicReference<Throwable> cursorCommitThrowable;
+    private final PulsarDeserializationSchema<OUT> deserializationSchema;
+    private ScheduledExecutorService cursorScheduler;
+
+    private PulsarSourceReader(
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            PulsarSourceFetcherManager fetcherManager,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            SourceConfiguration sourceConfiguration,
+            PulsarClient pulsarClient,
+            SourceReaderContext context) {
+        super(
+                elementsQueue,
+                fetcherManager,
+                new PulsarRecordEmitter<>(deserializationSchema),
+                sourceConfiguration,
+                context);
+
+        this.deserializationSchema = deserializationSchema;
+        this.sourceConfiguration = sourceConfiguration;
+        this.pulsarClient = pulsarClient;
+
+        this.cursorsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
+        this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
+        this.cursorCommitThrowable = new AtomicReference<>();
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+            this.cursorScheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+            // Auto commit cursor, this could be enabled when checkpoint is 
also enabled.
+            cursorScheduler.scheduleAtFixedRate(
+                    this::cumulativeAcknowledgmentMessage,
+                    sourceConfiguration.getMaxFetchTime().toMillis(),
+                    sourceConfiguration.getAutoCommitCursorInterval(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
+        Throwable cause = cursorCommitThrowable.get();
+        if (cause != null) {
+            throw new FlinkRuntimeException("An error occurred in acknowledge 
message.", cause);
+        }
+
+        return super.pollNext(output);
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> 
finishedSplitIds) {
+        // Close all the finished splits.
+        for (String splitId : finishedSplitIds.keySet()) {
+            ((PulsarSourceFetcherManager) 
splitFetcherManager).closeFetcher(splitId);
+        }
+
+        // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+        }
+
+        for (Map.Entry<String, PulsarPartitionSplitState> entry : 
finishedSplitIds.entrySet()) {
+            PulsarPartitionSplitState state = entry.getValue();
+            MessageId latestConsumedId = state.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursorsOfFinishedSplits.put(state.getPartition(), 
latestConsumedId);
+            }
+        }
+    }
+
+    @Override
+    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit 
split) {
+        return new PulsarPartitionSplitState(split);
+    }
+
+    @Override
+    protected PulsarPartitionSplit toSplitType(
+            String splitId, PulsarPartitionSplitState splitState) {
+        return splitState.toPulsarPartitionSplit();
+    }
+
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+    }
+
+    @Override
+    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+        List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
+
+        // Perform a snapshot for these splits.
+        Map<TopicPartition, MessageId> cursors =
+                cursorsToCommit.computeIfAbsent(checkpointId, id -> new 
HashMap<>());
+        // Put the cursors of the active splits.
+        for (PulsarPartitionSplit split : splits) {
+            MessageId latestConsumedId = split.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursors.put(split.getPartition(), latestConsumedId);
+            }
+        }
+        // Put cursors of all the finished splits.
+        cursors.putAll(cursorsOfFinishedSplits);
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
{
+            ((PulsarTableDeserializationSchema) 
deserializationSchema).updateCurrentCheckpointId(checkpointId);
+        }
+        return splits;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        LOG.debug("Committing cursors for checkpoint {}", checkpointId);
+        Map<TopicPartition, MessageId> cursors = 
cursorsToCommit.get(checkpointId);
+        try {
+            ((PulsarSourceFetcherManager) 
splitFetcherManager).acknowledgeMessages(cursors);
+            LOG.debug("Successfully acknowledge cursors for checkpoint {}", 
checkpointId);
+
+            // Clean up the cursors.
+            cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+            cursorsToCommit.headMap(checkpointId + 1).clear();
+            if (deserializationSchema instanceof 
PulsarTableDeserializationSchema) {
+                PulsarTableDeserializationSchema schema = 
(PulsarTableDeserializationSchema) deserializationSchema;
+                schema.flushAudit();
+                schema.updateLastCheckpointId(checkpointId);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to acknowledge cursors for checkpoint {}", 
checkpointId, e);
+            cursorCommitThrowable.compareAndSet(null, e);
+        }
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (cursorScheduler != null) {
+            cursorScheduler.shutdown();
+        }
+
+        // Close the all the consumers.
+        super.close();
+
+        // Close shared pulsar resources.
+        pulsarClient.shutdown();
+    }
+
+    // ----------------- helper methods --------------
+
+    /** Acknowledge the pulsar topic partition cursor by the last consumed 
message id. */
+    private void cumulativeAcknowledgmentMessage() {
+        Map<TopicPartition, MessageId> cursors = new 
HashMap<>(cursorsOfFinishedSplits);
+
+        // We reuse snapshotState for acquiring a consume status snapshot.
+        // So the checkpoint didn't really happen, so we just pass a fake 
checkpoint id.
+        List<PulsarPartitionSplit> splits = super.snapshotState(1L);
+        for (PulsarPartitionSplit split : splits) {
+            MessageId latestConsumedId = split.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursors.put(split.getPartition(), latestConsumedId);
+            }
+        }
+
+        try {
+            ((PulsarSourceFetcherManager) 
splitFetcherManager).acknowledgeMessages(cursors);
+            // Clean up the finish splits.
+            cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+        } catch (Exception e) {
+            LOG.error("Fail in auto cursor commit.", e);
+            cursorCommitThrowable.compareAndSet(null, e);
+        }
+    }
+
+    /** Factory method for creating PulsarSourceReader. */
+    public static <OUT> PulsarSourceReader<OUT> create(
+            SourceConfiguration sourceConfiguration,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            PulsarCrypto pulsarCrypto,
+            SourceReaderContext readerContext)
+            throws Exception {
+
+        // Create a message queue with the predefined source option.
+        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>(queueCapacity);
+
+        PulsarClient pulsarClient = createClient(sourceConfiguration);
+
+        // Initialize the deserialization schema before creating the pulsar 
reader.
+        PulsarDeserializationSchemaInitializationContext initializationContext 
=
+                new 
PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient);
+        deserializationSchema.open(initializationContext, sourceConfiguration);
+
+        // Choose the right schema bytes to use.
+        Schema<byte[]> schema;
+        if (sourceConfiguration.isEnableSchemaEvolution()) {
+            // Wrap the schema into a byte array schema with extra schema info 
check.
+            PulsarSchema<?> pulsarSchema =
+                    ((PulsarSchemaWrapper<?>) 
deserializationSchema).pulsarSchema();
+            schema = new BytesSchema(pulsarSchema);
+        } else {
+            schema = Schema.BYTES;
+        }
+
+        // Create an ordered split reader supplier.
+        Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> 
splitReaderSupplier =
+                () -> new PulsarPartitionSplitReader(
+                        pulsarClient,
+                        sourceConfiguration,
+                        schema,
+                        pulsarCrypto,
+                        readerContext.metricGroup());
+
+        PulsarSourceFetcherManager fetcherManager =
+                new PulsarSourceFetcherManager(
+                        elementsQueue, splitReaderSupplier, 
readerContext.getConfiguration());
+
+        return new PulsarSourceReader<>(
+                elementsQueue,
+                fetcherManager,
+                deserializationSchema,
+                sourceConfiguration,
+                pulsarClient,
+                readerContext);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index deb240c7ed..e528f983e1 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 import 
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
 import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource;
 
@@ -129,7 +130,8 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
-                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+                PulsarSinkOptions.SINK_CONFIG_PREFIX,
+                ExtractNode.INLONG_MSG);
 
         validatePrimaryKeyConstraints(
                 context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
index 17466899d7..eeeafa6872 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.pulsar.table.source;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -65,7 +65,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
 
     private final boolean upsertMode;
 
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     private MetricOption metricOption;
 
@@ -94,7 +94,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             keyDeserialization.open(context);
         }
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
         valueDeserialization.open(context);
     }
@@ -117,7 +117,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             return;
         }
         MetricsCollector<RowData> metricsCollector =
-                new MetricsCollector<>(collector, sourceMetricData);
+                new MetricsCollector<>(collector, sourceExactlyMetric);
 
         valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
 
@@ -130,4 +130,22 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
     }
+
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index bf48356d26..0445739212 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.sort.pulsar.table.source;
 
+import org.apache.inlong.sort.pulsar.source.PulsarSource;
+import org.apache.inlong.sort.pulsar.source.PulsarSourceBuilder;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index c340f78668..8d1fd1ede9 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -856,6 +856,9 @@
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
        
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarTableSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
   Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
   License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 

Reply via email to