This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new ada439ce81 [INLONG-10358][Sort] Make pulsar source support report 
audit information exactly once (#10511)
ada439ce81 is described below

commit ada439ce812f3947621c8c06e0275fe8aec3848f
Author: XiaoYou201 <>
AuthorDate: Mon Jul 1 10:34:04 2024 +0800

    [INLONG-10358][Sort] Make pulsar source support report audit information 
exactly once (#10511)
 .../inlong/sort/pulsar/source/    | 186 ++++++++
 .../sort/pulsar/source/    | 521 +++++++++++++++++++++
 .../pulsar/source/   | 128 +++++
 .../source/reader/   | 220 +++++++++
 .../source/reader/      |  87 ++++
 .../source/reader/ | 192 ++++++++
 .../table/    |  27 +-
 .../sort/pulsar/table/       |   3 +-
 licenses/inlong-sort-connectors/LICENSE            |   6 +
 9 files changed, 1364 insertions(+), 6 deletions(-)

diff --git 
new file mode 100644
index 0000000000..b7b608ec82
--- /dev/null
@@ -0,0 +1,186 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+ * The Source implementation of Pulsar. Please use a {@link 
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a 
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setTopics(TOPIC1, TOPIC2)
+ *     .setServiceUrl(getServiceUrl())
+ *     .setAdminUrl(getAdminUrl())
+ *     .setSubscriptionName("test")
+ *     .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
+ *     .setBounded(StopCursor::defaultStopCursor)
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <OUT> The output type of the source.
+ * copy from {@link org.apache.flink.connector.pulsar.source.PulsarSource}
+ * not modified
+ */
+public final class PulsarSource<OUT>
+        implements
+            Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>,
+            ResultTypeQueryable<OUT> {
+    private static final long serialVersionUID = 7773108631275567433L;
+    /**
+     * The configuration for pulsar source, we don't support the pulsar's 
configuration class
+     * directly.
+     */
+    private final SourceConfiguration sourceConfiguration;
+    private final PulsarSubscriber subscriber;
+    private final RangeGenerator rangeGenerator;
+    private final StartCursor startCursor;
+    private final StopCursor stopCursor;
+    private final Boundedness boundedness;
+    /** The pulsar deserialization schema used for deserializing message. */
+    private final PulsarDeserializationSchema<OUT> deserializationSchema;
+    /**
+     * The constructor for PulsarSource, it's package protected for forcing 
using {@link
+     * PulsarSourceBuilder}.
+     */
+    PulsarSource(
+            SourceConfiguration sourceConfiguration,
+            PulsarSubscriber subscriber,
+            RangeGenerator rangeGenerator,
+            StartCursor startCursor,
+            StopCursor stopCursor,
+            Boundedness boundedness,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        this.sourceConfiguration = sourceConfiguration;
+        this.subscriber = subscriber;
+        this.rangeGenerator = rangeGenerator;
+        this.startCursor = startCursor;
+        this.stopCursor = stopCursor;
+        this.boundedness = boundedness;
+        this.deserializationSchema = deserializationSchema;
+    }
+    /**
+     * Get a PulsarSourceBuilder to builder a {@link PulsarSource}.
+     *
+     * @return a Pulsar source builder.
+     */
+    public static <OUT> PulsarSourceBuilder<OUT> builder() {
+        return new PulsarSourceBuilder<>();
+    }
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness;
+    }
+    @Internal
+    @Override
+    public SourceReader<OUT, PulsarPartitionSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        // Initialize the deserialization schema before creating the pulsar 
+        PulsarDeserializationSchemaInitializationContext initializationContext 
+                new 
+, sourceConfiguration);
+        return PulsarSourceReaderFactory.create(
+                readerContext, deserializationSchema, sourceConfiguration);
+    }
+    @Internal
+    @Override
+    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
+            SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
+        return new PulsarSourceEnumerator(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                sourceConfiguration,
+                enumContext);
+    }
+    @Internal
+    @Override
+    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> 
+            SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
+            PulsarSourceEnumState checkpoint) {
+        return new PulsarSourceEnumerator(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                sourceConfiguration,
+                enumContext,
+                checkpoint);
+    }
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<PulsarPartitionSplit> 
getSplitSerializer() {
+        return PulsarPartitionSplitSerializer.INSTANCE;
+    }
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<PulsarSourceEnumState> 
getEnumeratorCheckpointSerializer() {
+        return PulsarSourceEnumStateSerializer.INSTANCE;
+    }
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
diff --git 
new file mode 100644
index 0000000000..d8c05ce152
--- /dev/null
@@ -0,0 +1,521 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import static java.lang.Boolean.FALSE;
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursor(StartCursor)}.
+ *
+ * <p>By default, the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the 
PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery 
as described below. For
+ * example the following PulsarSource stops after it consumes up to a event 
time when the Flink
+ * started.
+ *
+ * <p>To stop the connector user has to disable the auto partition discovery. 
As auto partition
+ * discovery always expected new splits to come and not exiting. To disable 
auto partition
+ * discovery, use builder.setConfig({@link
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ *     .builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
+ *     
+ *     .build();
+ * }</pre>
+ *
+ * @param <OUT> The output type of the source.
+ * copy from {@link 
+ * not modified
+ */
+public final class PulsarSourceBuilder<OUT> {
+    private static final Logger LOG = 
+    private final PulsarConfigBuilder configBuilder;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private StartCursor startCursor;
+    private StopCursor stopCursor;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<OUT> deserializationSchema;
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        this.configBuilder = new PulsarConfigBuilder();
+        this.startCursor = StartCursor.defaultStartCursor();
+        this.stopCursor = StopCursor.defaultStopCursor();
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
+        return setConfig(PULSAR_ADMIN_URL, adminUrl);
+    }
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) {
+        return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+    }
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setSubscriptionName(String 
subscriptionName) {
+        return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+    }
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type. Please take some time to consider 
which subscription
+     * type matches your application best. Default is {@link 
+     *
+     * @param subscriptionType The type of subscription.
+     * @return this PulsarSourceBuilder.
+     * @see <a 
+     *     Subscriptions</a>
+     */
+    public PulsarSourceBuilder<OUT> setSubscriptionType(SubscriptionType 
subscriptionType) {
+        return setConfig(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+    }
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. You can set topics once either with {@link 
#setTopics} or {@link
+     * #setTopicPattern} in this builder.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. You can set topics once either with {@link 
#setTopics} or {@link
+     * #setTopicPattern} in this builder.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        List<String> distinctTopics = TopicNameUtils.distinctTopics(topics);
+        this.subscriber = 
+        return this;
+    }
+    /**
+     * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
+     * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern) {
+        return setTopicPattern(Pattern.compile(topicsPattern));
+    }
+    /**
+     * Set a topic pattern to consume from the java {@link Pattern}. You can 
set topics once either
+     * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern) {
+        return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+    }
+    /**
+     * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
+     * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(
+            String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) 
+        return setTopicPattern(Pattern.compile(topicsPattern), 
+    }
+    /**
+     * Set a topic pattern to consume from the java {@link Pattern}. You can 
set topics once either
+     * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode When subscribing to a topic using a 
regular expression, you can
+     *     pick a certain type of topics.
+     *     <ul>
+     *       <li>PersistentOnly: only subscribe to persistent topics.
+     *       <li>NonPersistentOnly: only subscribe to non-persistent topics.
+     *       <li>AllTopics: subscribe to both persistent and non-persistent 
+     *     </ul>
+     *
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setTopicPattern(
+            Pattern topicsPattern, RegexSubscriptionMode 
regexSubscriptionMode) {
+        ensureSubscriberIsNull("topic pattern");
+        this.subscriber =
+                PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, 
+        return this;
+    }
+    /**
+     * The consumer name is informative, and it can be used to identify a 
particular consumer
+     * instance from the topic stats.
+     */
+    public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
+        return setConfig(PULSAR_CONSUMER_NAME, consumerName);
+    }
+    /**
+     * Set a topic range generator for Key_Shared subscription.
+     *
+     * @param rangeGenerator A generator which would generate a set of {@link 
TopicRange} for given
+     *     topic.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator 
rangeGenerator) {
+        if (configBuilder.contains(PULSAR_SUBSCRIPTION_TYPE)) {
+            SubscriptionType subscriptionType = 
+            checkArgument(
+                    subscriptionType == SubscriptionType.Key_Shared,
+                    "Key_Shared subscription should be used for custom 
rangeGenerator instead of %s",
+                    subscriptionType);
+        } else {
+            LOG.warn("No subscription type provided, set it to Key_Shared.");
+            setSubscriptionType(SubscriptionType.Key_Shared);
+        }
+        this.rangeGenerator = checkNotNull(rangeGenerator);
+        return this;
+    }
+    /**
+     * Specify from which offsets the PulsarSource should start consume from 
by providing an {@link
+     * StartCursor}.
+     *
+     * @param startCursor set the starting offsets for the Source.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
+        this.startCursor = checkNotNull(startCursor);
+        return this;
+    }
+    /**
+     * By default, the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+     * never stop until the Flink job is canceled or fails. To let the 
PulsarSource run in {@link
+     * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one 
can call {@link
+     * #setUnboundedStopCursor(StopCursor)} and disable auto partition 
discovery as described below.
+     *
+     * <p>This method is different from {@link 
#setBoundedStopCursor(StopCursor)} that after setting
+     * the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will still
+     * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will 
stop at the stopping
+     * offsets specified by the stopping offsets {@link StopCursor}.
+     *
+     * <p>To stop the connector user has to disable the auto partition 
discovery. As auto partition
+     * discovery always expected new splits to come and not exiting. To 
disable auto partition
+     * discovery, use builder.setConfig({@link
+     *
+     * @param stopCursor The {@link StopCursor} to specify the stopping offset.
+     * @return this PulsarSourceBuilder.
+     * @see #setBoundedStopCursor(StopCursor)
+     */
+    public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor 
stopCursor) {
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.stopCursor = checkNotNull(stopCursor);
+        return this;
+    }
+    /**
+     * By default, the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run in
+     * {@link Boundedness#BOUNDED} manner and stops at some point, one can set 
an {@link StopCursor}
+     * to specify the stopping offsets for each partition. When all the 
partitions have reached
+     * their stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setUnboundedStopCursor(StopCursor)} that after
+     * setting the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will
+     * return {@link Boundedness#BOUNDED} instead of {@link 
+     *
+     * @param stopCursor the {@link StopCursor} to specify the stopping 
+     * @return this PulsarSourceBuilder.
+     * @see #setUnboundedStopCursor(StopCursor)
+     */
+    public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor 
stopCursor) {
+        this.boundedness = Boundedness.BOUNDED;
+        this.stopCursor = checkNotNull(stopCursor);
+        return this;
+    }
+    /**
+     * DeserializationSchema is required for getting the {@link Schema} for 
deserialize message from
+     * pulsar and getting the {@link TypeInformation} for message 
serialization in flink.
+     *
+     * <p>We have defined a set of implementations, using {@code
+     * PulsarDeserializationSchema#pulsarSchema} or {@code 
+     * for creating the desired schema.
+     */
+    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+            PulsarDeserializationSchema<T> deserializationSchema) {
+        PulsarSourceBuilder<T> self = specialized();
+        self.deserializationSchema = deserializationSchema;
+        return self;
+    }
+    /**
+     * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * <p>Make sure the option could be set only once or with same value.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this PulsarSourceBuilder.
+     */
+    public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T 
value) {
+        configBuilder.set(key, value);
+        return this;
+    }
+    /**
+     * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * @param config the config to set for the PulsarSource.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setConfig(Configuration config) {
+        configBuilder.set(config);
+        return this;
+    }
+    /**
+     * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The 
valid keys can be
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+     *
+     * <p>This method is mainly used for future flink SQL binding.
+     *
+     * @param properties the config properties to set for the PulsarSource.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setProperties(Properties properties) {
+        configBuilder.set(properties);
+        return this;
+    }
+    /**
+     * Build the {@link PulsarSource}.
+     *
+     * @return a PulsarSource with the settings made for this builder.
+     */
+    @SuppressWarnings("java:S3776")
+    public PulsarSource<OUT> build() {
+        // Ensure the topic subscriber for pulsar.
+        checkNotNull(subscriber, "No topic names or topic pattern are 
+        SubscriptionType subscriptionType = 
+        if (subscriptionType == SubscriptionType.Key_Shared) {
+            if (rangeGenerator == null) {
+                LOG.warn(
+                        "No range generator provided for key_shared 
+                                + " we would use the SplitRangeGenerator as 
the default range generator.");
+                this.rangeGenerator = new SplitRangeGenerator();
+            }
+        } else {
+            // Override the range generator.
+            this.rangeGenerator = new FullRangeGenerator();
+        }
+        if (boundedness == null) {
+            LOG.warn("No boundedness was set, mark it as a endless stream.");
+            this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        }
+        if (boundedness == Boundedness.BOUNDED
+                && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) 
>= 0) {
+            LOG.warn(
+                    "{} property is overridden to -1 because the source is 
+            configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
+        }
+        checkNotNull(deserializationSchema, "deserializationSchema should be 
+        // Enable transaction if the cursor auto commit is disabled for 
Key_Shared & Shared.
+        if 
+                && (subscriptionType == SubscriptionType.Key_Shared
+                        || subscriptionType == SubscriptionType.Shared)) {
+                    "Pulsar cursor auto commit is disabled, make sure 
checkpoint is enabled "
+                            + "and your pulsar cluster is support the 
+            configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
+            if (!configBuilder.contains(PULSAR_READ_TRANSACTION_TIMEOUT)) {
+                LOG.warn(
+                        "The default pulsar transaction timeout is 3 hours, "
+                                + "make sure it was greater than your 
checkpoint interval.");
+            } else {
+                Long timeout = 
+                LOG.warn(
+                        "The configured transaction timeout is {} mille 
seconds, "
+                                + "make sure it was greater than your 
checkpoint interval.",
+                        timeout);
+            }
+        }
+        if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
+            LOG.warn(
+                    "We recommend set a readable consumer name through 
setConsumerName(String) in production mode.");
+        } else {
+            String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+            if (!consumerName.contains("%s")) {
+                configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " 
- %s");
+            }
+        }
+        // Since these implementations could be a lambda, make sure they are 
+        checkState(isSerializable(startCursor), "StartCursor isn't 
+        checkState(isSerializable(stopCursor), "StopCursor isn't 
+        checkState(isSerializable(rangeGenerator), "RangeGenerator isn't 
+        // Check builder configuration.
+        SourceConfiguration sourceConfiguration =
+      , 
+        return new PulsarSource<>(
+                sourceConfiguration,
+                subscriber,
+                rangeGenerator,
+                startCursor,
+                stopCursor,
+                boundedness,
+                deserializationSchema);
+    }
+    // ------------- private helpers --------------
+    /** Helper method for java compiler recognizes the generic type. */
+    @SuppressWarnings("unchecked")
+    private <T extends OUT> PulsarSourceBuilder<T> specialized() {
+        return (PulsarSourceBuilder<T>) this;
+    }
+    /** Topic name and topic pattern is conflict, make sure they are set only 
once. */
+    private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+        if (subscriber != null) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot use %s for consumption because a %s is 
already set for consumption.",
+                            attemptingSubscribeMode, 
+        }
+    }
diff --git 
new file mode 100644
index 0000000000..bbc42c149f
--- /dev/null
@@ -0,0 +1,128 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source;
+import org.apache.inlong.sort.pulsar.source.reader.PulsarOrderedSourceReader;
+import org.apache.inlong.sort.pulsar.source.reader.PulsarUnorderedSourceReader;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import java.util.function.Supplier;
+import static 
+import static 
+ * This factory class is used for creating different types of source reader 
for different
+ * subscription type.
+ *
+ * <ol>
+ *   <li>Failover, Exclusive: We would create {@link 
+ *   <li>Shared, Key_Shared: We would create {@link 
+ * </ol>
+ * copy from {@link 
+ * not modified
+ */
+public final class PulsarSourceReaderFactory {
+    private PulsarSourceReaderFactory() {
+        // No public constructor.
+    }
+    @SuppressWarnings("java:S2095")
+    public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(
+            SourceReaderContext readerContext,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            SourceConfiguration sourceConfiguration) {
+        PulsarClient pulsarClient = createClient(sourceConfiguration);
+        PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
+        // Create a message queue with the predefined source option.
+        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>(queueCapacity);
+        // Create different pulsar source reader by subscription type.
+        SubscriptionType subscriptionType = 
+        if (subscriptionType == SubscriptionType.Failover
+                || subscriptionType == SubscriptionType.Exclusive) {
+            // Create an ordered split reader supplier.
+            Supplier<PulsarOrderedPartitionSplitReader<OUT>> 
splitReaderSupplier =
+                    () -> new PulsarOrderedPartitionSplitReader<>(
+                            pulsarClient,
+                            pulsarAdmin,
+                            sourceConfiguration,
+                            deserializationSchema);
+            return new PulsarOrderedSourceReader<>(
+                    elementsQueue,
+                    splitReaderSupplier,
+                    readerContext,
+                    sourceConfiguration,
+                    pulsarClient,
+                    pulsarAdmin,
+                    deserializationSchema);
+        } else if (subscriptionType == SubscriptionType.Shared
+                || subscriptionType == SubscriptionType.Key_Shared) {
+            TransactionCoordinatorClient coordinatorClient =
+                    ((PulsarClientImpl) pulsarClient).getTcClient();
+            if (coordinatorClient == null
+                    && !sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+                throw new IllegalStateException("Transaction is required but 
didn't enabled");
+            }
+            Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier =
+                    () -> new PulsarUnorderedPartitionSplitReader<>(
+                            pulsarClient,
+                            pulsarAdmin,
+                            sourceConfiguration,
+                            deserializationSchema,
+                            coordinatorClient);
+            return new PulsarUnorderedSourceReader<>(
+                    elementsQueue,
+                    splitReaderSupplier,
+                    readerContext,
+                    sourceConfiguration,
+                    pulsarClient,
+                    pulsarAdmin,
+                    coordinatorClient,
+                    deserializationSchema);
+        } else {
+            throw new UnsupportedOperationException(
+                    "This subscription type is not " + subscriptionType + " 
supported currently.");
+        }
+    }
diff --git 
new file mode 100644
index 0000000000..f535082ea2
--- /dev/null
@@ -0,0 +1,220 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+ * The source reader for pulsar subscription Failover and Exclusive, which 
consumes the ordered
+ * messages.
+ * copy from {@link 
+ */
+public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT> {
+    private static final Logger LOG = 
+    @VisibleForTesting
+    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
+    private final ConcurrentMap<TopicPartition, MessageId> 
+    private final AtomicReference<Throwable> cursorCommitThrowable = new 
+    private final PulsarDeserializationSchema<OUT> deserializationSchema;
+    private ScheduledExecutorService cursorScheduler;
+    public PulsarOrderedSourceReader(
+            Supplier<PulsarOrderedPartitionSplitReader<OUT>> 
+            SourceReaderContext context,
+            SourceConfiguration sourceConfiguration,
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        super(
+                elementsQueue,
+                new PulsarOrderedFetcherManager<>(elementsQueue, 
+                context,
+                sourceConfiguration,
+                pulsarClient,
+                pulsarAdmin);
+        this.cursorsToCommit = Collections.synchronizedSortedMap(new 
+        this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
+        this.deserializationSchema = deserializationSchema;
+    }
+    @Override
+    public void start() {
+        super.start();
+        if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+            this.cursorScheduler = 
+            // Auto commit cursor, this could be enabled when checkpoint is 
also enabled.
+            cursorScheduler.scheduleAtFixedRate(
+                    this::cumulativeAcknowledgmentMessage,
+                    sourceConfiguration.getMaxFetchTime().toMillis(),
+                    sourceConfiguration.getAutoCommitCursorInterval(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+    @Override
+    public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
+        checkErrorAndRethrow();
+        return super.pollNext(output);
+    }
+    @Override
+    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> 
finishedSplitIds) {
+        // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+        }
+        for (Map.Entry<String, PulsarPartitionSplitState> entry : 
finishedSplitIds.entrySet()) {
+            PulsarPartitionSplitState state = entry.getValue();
+            MessageId latestConsumedId = state.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursorsOfFinishedSplits.put(state.getPartition(), 
+            }
+        }
+    }
+    @Override
+    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
+            ((PulsarTableDeserializationSchema) 
+        }
+        List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
+        // Perform a snapshot for these splits.
+        Map<TopicPartition, MessageId> cursors =
+                cursorsToCommit.computeIfAbsent(checkpointId, id -> new 
+        // Put the cursors of the active splits.
+        for (PulsarPartitionSplit split : splits) {
+            MessageId latestConsumedId = split.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursors.put(split.getPartition(), latestConsumedId);
+            }
+        }
+        // Put cursors of all the finished splits.
+        cursors.putAll(cursorsOfFinishedSplits);
+        return splits;
+    }
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        LOG.debug("Committing cursors for checkpoint {}", checkpointId);
+        Map<TopicPartition, MessageId> cursors = 
+        try {
+            ((PulsarOrderedFetcherManager<OUT>) 
+            LOG.debug("Successfully acknowledge cursors for checkpoint {}", 
+            // Clean up the cursors.
+            cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+            cursorsToCommit.headMap(checkpointId + 1).clear();
+            if (deserializationSchema instanceof 
PulsarTableDeserializationSchema) {
+                PulsarTableDeserializationSchema 
pulsarTableDeserializationSchema =
+                        (PulsarTableDeserializationSchema) 
+                pulsarTableDeserializationSchema.flushAudit();
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to acknowledge cursors for checkpoint {}", 
checkpointId, e);
+            cursorCommitThrowable.compareAndSet(null, e);
+        }
+    }
+    @Override
+    public void close() throws Exception {
+        if (cursorScheduler != null) {
+            cursorScheduler.shutdown();
+        }
+        super.close();
+    }
+    // ----------------- helper methods --------------
+    private void checkErrorAndRethrow() {
+        Throwable cause = cursorCommitThrowable.get();
+        if (cause != null) {
+            throw new RuntimeException("An error occurred in acknowledge 
message.", cause);
+        }
+    }
+    /** Acknowledge the pulsar topic partition cursor by the last consumed 
message id. */
+    private void cumulativeAcknowledgmentMessage() {
+        Map<TopicPartition, MessageId> cursors = new 
+        // We reuse snapshotState for acquiring a consume status snapshot.
+        // So the checkpoint didn't really happen, so we just pass a fake 
checkpoint id.
+        List<PulsarPartitionSplit> splits = super.snapshotState(1L);
+        for (PulsarPartitionSplit split : splits) {
+            MessageId latestConsumedId = split.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursors.put(split.getPartition(), latestConsumedId);
+            }
+        }
+        try {
+            ((PulsarOrderedFetcherManager<OUT>) 
+            // Clean up the finish splits.
+            cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+        } catch (Exception e) {
+            LOG.error("Fail in auto cursor commit.", e);
+            cursorCommitThrowable.compareAndSet(null, e);
+        }
+    }
diff --git 
new file mode 100644
index 0000000000..f7e6bafc1d
--- /dev/null
@@ -0,0 +1,87 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+ * The common pulsar source reader for both ordered & unordered message 
+ *
+ * @param <OUT> The output message type for flink.
+ * copy from {@link 
+ * not modified
+ */
+abstract class PulsarSourceReaderBase<OUT>
+        extends
+            SourceReaderBase<PulsarMessage<OUT>, OUT, PulsarPartitionSplit, 
PulsarPartitionSplitState> {
+    protected final SourceConfiguration sourceConfiguration;
+    protected final PulsarClient pulsarClient;
+    protected final PulsarAdmin pulsarAdmin;
+    protected PulsarSourceReaderBase(
+            PulsarFetcherManagerBase<OUT> splitFetcherManager,
+            SourceReaderContext context,
+            SourceConfiguration sourceConfiguration,
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin) {
+        super(
+                elementsQueue,
+                splitFetcherManager,
+                new PulsarRecordEmitter<>(),
+                sourceConfiguration,
+                context);
+        this.sourceConfiguration = sourceConfiguration;
+        this.pulsarClient = pulsarClient;
+        this.pulsarAdmin = pulsarAdmin;
+    }
+    @Override
+    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit 
split) {
+        return new PulsarPartitionSplitState(split);
+    }
+    @Override
+    protected PulsarPartitionSplit toSplitType(
+            String splitId, PulsarPartitionSplitState splitState) {
+        return splitState.toPulsarPartitionSplit();
+    }
+    @Override
+    public void close() throws Exception {
+        // Close the all the consumers first.
+        super.close();
+        // Close shared pulsar resources.
+        pulsarClient.shutdown();
+        pulsarAdmin.close();
+    }
diff --git 
new file mode 100644
index 0000000000..2ccf74fe3a
--- /dev/null
@@ -0,0 +1,192 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import static;
+ * The source reader for pulsar subscription Shared and Key_Shared, which 
consumes the unordered
+ * messages.
+ * copy from {@link 
+ */
+public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT> {
+    private static final Logger LOG = 
+    @Nullable
+    private final TransactionCoordinatorClient coordinatorClient;
+    private final SortedMap<Long, List<TxnID>> transactionsToCommit;
+    private final List<TxnID> transactionsOfFinishedSplits;
+    private final PulsarDeserializationSchema<OUT> deserializationSchema;
+    private boolean started = false;
+    public PulsarUnorderedSourceReader(
+            Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
+            SourceReaderContext context,
+            SourceConfiguration sourceConfiguration,
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            @Nullable TransactionCoordinatorClient coordinatorClient,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        super(
+                elementsQueue,
+                new PulsarUnorderedFetcherManager<>(elementsQueue, 
+                context,
+                sourceConfiguration,
+                pulsarClient,
+                pulsarAdmin);
+        this.coordinatorClient = coordinatorClient;
+        this.transactionsToCommit = Collections.synchronizedSortedMap(new 
+        this.transactionsOfFinishedSplits = Collections.synchronizedList(new 
+        this.deserializationSchema = deserializationSchema;
+    }
+    @Override
+    public void start() {
+        this.started = true;
+        super.start();
+    }
+    @Override
+    public void addSplits(List<PulsarPartitionSplit> splits) {
+        if (started) {
+            // We only accept splits after this reader is started and 
registered to the pipeline.
+            // This would ignore the splits from the state.
+            super.addSplits(splits);
+        } else {
+            // Abort the pending transaction in this split.
+            for (PulsarPartitionSplit split : splits) {
+      "Ignore the split {} saved in checkpoint.", split);
+                TxnID transactionId = split.getUncommittedTransactionId();
+                if (transactionId != null && coordinatorClient != null) {
+                    try {
+                        coordinatorClient.abort(transactionId);
+                    } catch (Exception e) {
+                        LOG.debug(
+                                "Error in aborting transaction {} from the 
+                                transactionId,
+                                e);
+                    }
+                }
+            }
+        }
+    }
+    @Override
+    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> 
finishedSplitIds) {
+        // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+        }
+        if (coordinatorClient != null) {
+            // Commit the uncommitted transaction
+            for (Map.Entry<String, PulsarPartitionSplitState> entry : 
finishedSplitIds.entrySet()) {
+                PulsarPartitionSplitState state = entry.getValue();
+                TxnID uncommittedTransactionId = 
+                if (uncommittedTransactionId != null) {
+                    transactionsOfFinishedSplits.add(uncommittedTransactionId);
+                }
+            }
+        }
+    }
+    @Override
+    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+        LOG.debug("Trigger the new transaction for downstream readers.");
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
+            ((PulsarTableDeserializationSchema) 
+        }
+        List<PulsarPartitionSplit> splits =
+                ((PulsarUnorderedFetcherManager<OUT>) 
+        if (coordinatorClient == null) {
+            return splits;
+        }
+        // Snapshot the transaction status and commit it after checkpoint 
+        List<TxnID> txnIDs =
+                transactionsToCommit.computeIfAbsent(checkpointId, id -> new 
+        for (PulsarPartitionSplit split : splits) {
+            TxnID uncommittedTransactionId = 
+            if (uncommittedTransactionId != null) {
+                txnIDs.add(uncommittedTransactionId);
+            }
+        }
+        return splits;
+    }
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.debug("Committing transactions for checkpoint {}", checkpointId);
+        if (coordinatorClient == null) {
+            return;
+        }
+        List<Long> checkpointIds =
+                transactionsToCommit.keySet().stream()
+                        .filter(id -> id <= checkpointId)
+                        .collect(toList());
+        for (Long id : checkpointIds) {
+            List<TxnID> transactions = transactionsToCommit.remove(id);
+            if (transactions != null) {
+                for (TxnID transaction : transactions) {
+                    coordinatorClient.commit(transaction);
+                    transactionsOfFinishedSplits.remove(transaction);
+                }
+            }
+        }
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
+            PulsarTableDeserializationSchema pulsarTableDeserializationSchema =
+                    (PulsarTableDeserializationSchema) deserializationSchema;
+            pulsarTableDeserializationSchema.flushAudit();
+        }
+    }
diff --git 
index 2dae39e828..554c81a33f 100644
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.pulsar.table;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -59,7 +59,7 @@ public class PulsarTableDeserializationSchema implements 
     private final boolean innerFormat;
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
     private MetricOption metricOption;
@@ -90,7 +90,7 @@ public class PulsarTableDeserializationSchema implements 
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
@@ -98,7 +98,6 @@ public class PulsarTableDeserializationSchema implements 
     public void deserialize(Message<byte[]> message, Collector<RowData> 
             throws IOException {
         // Get the key row data
         List<RowData> keyRowData = new ArrayList<>();
         if (keyDeserialization != null) {
@@ -114,7 +113,7 @@ public class PulsarTableDeserializationSchema implements 
         MetricsCollector<RowData> metricsCollector =
-                new MetricsCollector<>(new ListCollector<>(valueRowData), 
+                new MetricsCollector<>(new ListCollector<>(valueRowData), 
         // reset timestamp if the deserialize schema has not inner format
         if (!innerFormat) {
@@ -131,4 +130,22 @@ public class PulsarTableDeserializationSchema implements 
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
\ No newline at end of file
diff --git 
index 84a076d502..e5df548650 100644
@@ -17,8 +17,9 @@
 package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.pulsar.source.PulsarSource;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
diff --git a/licenses/inlong-sort-connectors/LICENSE 
index 126eebbdc7..e9b2f0b4cd 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -804,6 +804,12 @@
  Source  : flink-connector-pulsar 4.0-SNAPSHOT (Please note that the software 
have been modified.)
  License :

Reply via email to