syhily commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r687535229



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+
+/**
+ * The source reader for pulsar subscription Shared and Key_Shared, which 
consumes the unordered
+ * messages.
+ */
+@Internal
+public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);
+
+    private final TransactionCoordinatorClient coordinatorClient;
+    private final SortedMap<Long, List<TxnID>> transactionsToCommit;
+    private final List<TxnID> transactionsOfFinishedSplits;
+
+    public PulsarUnorderedSourceReader(
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
+            Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier,
+            Configuration configuration,
+            SourceReaderContext context,
+            SourceConfiguration sourceConfiguration,
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            TransactionCoordinatorClient coordinatorClient) {
+        super(
+                elementsQueue,
+                new PulsarUnorderedFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
+                configuration,
+                context,
+                sourceConfiguration,
+                pulsarClient,
+                pulsarAdmin);
+
+        this.coordinatorClient = coordinatorClient;
+        this.transactionsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
+        this.transactionsOfFinishedSplits = Collections.synchronizedList(new 
ArrayList<>());
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> 
finishedSplitIds) {
+        // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+        }
+
+        for (Map.Entry<String, PulsarPartitionSplitState> entry : 
finishedSplitIds.entrySet()) {
+            PulsarPartitionSplitState state = entry.getValue();
+            TxnID uncommittedTransactionId = 
state.getUncommittedTransactionId();
+            if (uncommittedTransactionId != null) {
+                transactionsOfFinishedSplits.add(uncommittedTransactionId);
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {

Review comment:
       Yes, that looks way better. I have dropped the `clear` action on empty 
collections.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+
+/**
+ * The split reader a given {@link PulsarPartitionSplit}, it would be closed 
once the {@link
+ * PulsarOrderedSourceReader} is closed.
+ *
+ * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
+ */
+@Internal
+public class PulsarOrderedPartitionSplitReader<OUT> extends 
PulsarPartitionSplitReaderBase<OUT> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);
+
+    public PulsarOrderedPartitionSplitReader(
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            Configuration configuration,
+            SourceConfiguration sourceConfiguration,
+            ConfigurationDataCustomizer<ConsumerConfigurationData<byte[]>>
+                    consumerConfigurationCustomizer,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        super(
+                pulsarClient,
+                pulsarAdmin,
+                configuration,
+                sourceConfiguration,
+                consumerConfigurationCustomizer,
+                deserializationSchema);
+    }
+
+    @Override
+    protected Message<byte[]> pollMessage(Duration timeout)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        return pulsarConsumer.receiveAsync().get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);

Review comment:
       > Okay thanks for the explanation. Just a last question for 
clarification: Even if we don't use `batchReceiveAsync`, there is some internal 
batching in Pulsar, such that `receiveAsync` rarely involves real I/O.
   
   Yes you are right. The message queue is defined in 
[ConsumerBase](https://github.com/apache/pulsar/blob/9088294a6410ff2e15d99f729104700faeccae4f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L77)
 and would be updated by [client 
ctx](https://github.com/apache/pulsar/blob/9088294a6410ff2e15d99f729104700faeccae4f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L447)

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/NoneSchemaFactory.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.schema.factories;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchemaFactory;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * If user didn't use schema, it would be a type with {@link SchemaType#NONE}, 
we would just use
+ * byte array.
+ */
+public class NoneSchemaFactory implements PulsarSchemaFactory<byte[]> {

Review comment:
       OK. But the `PrimitiveSchemaFactory` would need a extra ctor for this 
schema type.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** The state class for recording the split assignment. */
+@Internal
+public class SplitsAssignmentState implements Serializable {
+    private static final long serialVersionUID = -3244274150389546170L;
+
+    private final SerializableSupplier<StartCursor> startCursorSupplier;
+    private final SerializableSupplier<StopCursor> stopCursorSupplier;
+    private final SourceConfiguration sourceConfiguration;
+
+    // The dynamic states for checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private final Map<String, Set<Integer>> splitReaderMapping;
+    private boolean initialized;
+
+    public SplitsAssignmentState(
+            SerializableSupplier<StartCursor> startCursorSupplier,
+            SerializableSupplier<StopCursor> stopCursorSupplier,
+            SourceConfiguration sourceConfiguration) {
+        this.startCursorSupplier = startCursorSupplier;
+        this.stopCursorSupplier = stopCursorSupplier;
+        this.sourceConfiguration = sourceConfiguration;
+        this.appendedPartitions = new HashSet<>();
+        this.pendingPartitionSplits = new HashSet<>();
+        this.splitReaderMapping = new HashMap<>();
+        this.initialized = false;
+    }
+
+    public SplitsAssignmentState(
+            SerializableSupplier<StartCursor> startCursorSupplier,
+            SerializableSupplier<StopCursor> stopCursorSupplier,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.startCursorSupplier = startCursorSupplier;
+        this.stopCursorSupplier = stopCursorSupplier;
+        this.sourceConfiguration = sourceConfiguration;
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.pendingPartitionSplits = 
sourceEnumState.getPendingPartitionSplits();
+        this.splitReaderMapping = sourceEnumState.getSplitReaderMapping();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions, pendingPartitionSplits, 
splitReaderMapping, initialized);
+    }
+
+    /**
+     * Append the new fetched partitions to current state. We would generate 
pending source split
+     * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} 
don't support put the
+     * split back to enumerator, we don't support partition deletion.
+     *
+     * @param fetchedPartitions The partitions from the {@link 
PulsarSubscriber}.
+     */
+    public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        for (TopicPartition fetchedPartition : fetchedPartitions) {
+            if (!appendedPartitions.contains(fetchedPartition)) {
+                // This is a new topic partition.
+                PulsarPartitionSplit split =
+                        new PulsarPartitionSplit(
+                                fetchedPartition,
+                                startCursorSupplier.get(),

Review comment:
       `SerializationUtils` is a class from commons-lang3, use 
`InstantiationUtil.clone` would be better.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.split;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.message.PulsarMessageCollector;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static 
org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
+import static 
org.apache.flink.connector.pulsar.source.config.CursorVerification.WARN_ON_MISMATCH;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumer;
+
+/**
+ * The common partition split reader.
+ *
+ * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
+ */
+abstract class PulsarPartitionSplitReaderBase<OUT>
+        implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
+
+    protected final PulsarClient pulsarClient;
+    protected final PulsarAdmin pulsarAdmin;
+    protected final Configuration configuration;
+    protected final SourceConfiguration sourceConfiguration;
+    protected final 
ConfigurationDataCustomizer<ConsumerConfigurationData<byte[]>>
+            consumerConfigurationCustomizer;
+    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
+    protected final AtomicBoolean wakeup;
+
+    protected Consumer<byte[]> pulsarConsumer;
+    protected PulsarPartitionSplit registeredSplit;
+
+    protected PulsarPartitionSplitReaderBase(
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            Configuration configuration,
+            SourceConfiguration sourceConfiguration,
+            ConfigurationDataCustomizer<ConsumerConfigurationData<byte[]>>
+                    consumerConfigurationCustomizer,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        this.pulsarClient = pulsarClient;
+        this.pulsarAdmin = pulsarAdmin;
+        this.configuration = configuration;
+        this.sourceConfiguration = sourceConfiguration;
+        this.consumerConfigurationCustomizer = consumerConfigurationCustomizer;
+        this.deserializationSchema = deserializationSchema;
+        this.wakeup = new AtomicBoolean(false);
+    }
+
+    @Override
+    public RecordsWithSplitIds<PulsarMessage<OUT>> fetch() throws IOException {
+        RecordsBySplits.Builder<PulsarMessage<OUT>> builder = new 
RecordsBySplits.Builder<>();
+
+        // Return when no split registered to this reader.
+        if (pulsarConsumer == null || registeredSplit == null) {
+            return builder.build();
+        }
+
+        // Set wakeup to false for start consuming.
+        wakeup.compareAndSet(true, false);
+
+        StopCursor stopCursor = registeredSplit.getStopCursor();
+        String splitId = registeredSplit.splitId();
+        PulsarMessageCollector<OUT> collector = new 
PulsarMessageCollector<>(splitId, builder);
+        Deadline deadline = 
Deadline.fromNow(sourceConfiguration.getMaxFetchTime());
+
+        // Consume message from pulsar until it was woke up by flink reader.
+        for (int messageNum = 0;
+                messageNum < sourceConfiguration.getMaxFetchRecords()
+                        && deadline.hasTimeLeft()
+                        && isNotWakeup();
+                messageNum++) {
+            try {
+                Duration timeout = deadline.timeLeftIfAny();
+                Message<byte[]> message = pollMessage(timeout);
+
+                if (stopCursor.shouldStop(message)) {
+                    builder.addFinishedSplit(splitId);
+                    break;
+                }
+
+                // Deserialize message.
+                collector.setMessage(message);
+                deserializationSchema.deserialize(message, collector);
+
+                // Acknowledge message if need.
+                finishedPollMessage(message);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                break;
+            } catch (TimeoutException e) {
+                break;
+            } catch (ExecutionException e) {
+                LOG.error("Error in polling message from pulsar consumer.", e);
+                break;
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> 
splitsChanges) {
+        LOG.debug("Handle split changes {}", splitsChanges);
+
+        // Get all the partition assignments and stopping offsets.
+        if (!(splitsChanges instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The SplitChange type of %s is not supported.",
+                            splitsChanges.getClass()));
+        }
+
+        if (registeredSplit != null) {
+            throw new IllegalStateException("This split reader have assigned 
split.");
+        }
+
+        List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
+        Preconditions.checkArgument(
+                newSplits.size() == 1, "This pulsar split reader only support 
one split.");
+        PulsarPartitionSplit newSplit = newSplits.get(0);
+
+        // Create pulsar consumer.
+        Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
+
+        // Open start & stop cursor.
+        newSplit.open(configuration, sourceConfiguration);
+
+        // Start Consumer.
+        startConsumer(newSplit, consumer);
+
+        LOG.info("Register split {} consumer for current reader.", newSplit);
+        this.registeredSplit = newSplit;
+        this.pulsarConsumer = consumer;
+    }
+
+    @Override
+    public void wakeUp() {
+        wakeup.compareAndSet(false, true);
+    }
+
+    @Override
+    public void close() {
+        if (pulsarConsumer != null) {
+            sneakyClient(() -> pulsarConsumer.close());
+        }
+    }
+
+    protected abstract Message<byte[]> pollMessage(Duration timeout)
+            throws ExecutionException, InterruptedException, TimeoutException;
+
+    protected abstract void finishedPollMessage(Message<byte[]> message);
+
+    protected abstract void startConsumer(PulsarPartitionSplit split, 
Consumer<byte[]> consumer);
+
+    // --------------------------- Helper Methods -----------------------------
+
+    protected void initialStartPosition(PulsarPartitionSplit split, 
Consumer<byte[]> consumer) {
+        StartCursor startCursor = split.getStartCursor();
+        TopicPartition partition = split.getPartition();
+        // Seek start consuming position for assigned split.
+        CursorPosition position = startCursor.position(split);
+
+        // Validate the start position and assign it to consumer.
+        if (position.getType() == CursorPosition.Type.MESSAGE_ID) {
+            MessageId initialMessageId = position.getMessageId();
+
+            if (!initialMessageId.equals(MessageId.earliest)
+                    && !initialMessageId.equals(MessageId.latest)) {
+                MessageId lastMessageId =
+                        sneakyAdmin(
+                                () ->
+                                        pulsarAdmin
+                                                .topics()
+                                                
.getLastMessageId(partition.getFullTopicName()));
+                if (initialMessageId.compareTo(lastMessageId) > 0) {
+                    CursorVerification verification = 
sourceConfiguration.getVerifyInitialOffsets();
+                    if (verification == FAIL_ON_MISMATCH) {
+                        throw new IllegalArgumentException(
+                                "Invalid start position "
+                                        + initialMessageId
+                                        + " for partition "
+                                        + partition);
+                    } else {
+                        if (verification == WARN_ON_MISMATCH) {
+                            LOG.warn(
+                                    "Start position {} is wrong, reset to 
valid position {}",
+                                    initialMessageId,
+                                    lastMessageId);
+                        }
+                        position = new CursorPosition(lastMessageId);
+                    }
+                }
+            }
+        }
+
+        // Set position for current consumer. We don't need to use
+        // consumer.redeliverUnacknowledgedMessages()
+        position.seekPosition(consumer);

Review comment:
       After reading the pulsar internal `seek(MessageId)` 
[logic](https://github.com/apache/pulsar/blob/36d5738412bb1ed9018178007bf63d9202b675db/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1151)
 and seek(timestamp) 
[logic](https://github.com/apache/pulsar/blob/36d5738412bb1ed9018178007bf63d9202b675db/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L661),
 I found that I was wrong.
   
   Pulsar implicit handle the wrong message id and timestamp. It would reset to 
`MessageId.latest` when `initialMessageId > lastMessageId` and reset the cursor 
to first position in ledger when the timestamp don't exist.
   
   So this validate logic is useless and pulsar client can handle most 
situation. We can just try catch the consumer.seek method and throws exception 
when `FAIL_ON_MISMATCH`

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceSplit} implementation for a Pulsar's partition. */
+@Internal
+public class PulsarPartitionSplit implements SourceSplit {
+
+    private final TopicPartition partition;
+
+    private final StartCursor startCursor;
+
+    private final StopCursor stopCursor;
+
+    @Nullable private final MessageId latestConsumedId;

Review comment:
       This class don't implement the `Serializable`. Maybe I should just 
remove the serialization logic in `PulsarPartitionSplitSerializer`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to