AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r684403885
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; + +/** The pojo class for pulsar topic metadata information. */ +public final class TopicMetadata { + + /** + * The name of the topic, it would be a {@link TopicNameUtils#topicName(String)} which don't + * contains partition information. + */ + private final String name; + + /** If this topic is a partitioned topic. */ + private final boolean partitioned; + + /** The size for a partitioned topic. It would be zero for non-partitioned topic. */ + private final int partitionSize; + + public TopicMetadata(String name, int partitionSize) { + this.name = name; + this.partitioned = partitionSize != NON_PARTITIONED; Review comment: I'd keep this POJO dumb and pass all 3 field values as ctor parameters. Then you only need the logic part `partitionSize != NON_PARTITIONED` in `createTopicPartitions`. Remember that this is public API and users want to test their custom `RangeGenerator` by creating instances of this class. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; + +/** The pojo class for pulsar topic metadata information. */ +public final class TopicMetadata { Review comment: Add `@PublicEvolving` as its part of the `RangeGenerator`. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarClosingUtils.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.utils; + +import org.apache.flink.connector.pulsar.common.exception.PulsarClosingException; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** A util class to help with a clean component shutdown. */ +public final class PulsarClosingUtils { Review comment: I think this whole class is unneeded. Flink already cancels task with a timeout and interruptions and you pretty much just replicate the mechanism on another level. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Pattern; + +import static java.util.stream.Collectors.toSet; + +/** Subscribe to matching topics based on topic pattern. */ +public class TopicPatternSubscriber extends BasePulsarSubscriber { + private static final long serialVersionUID = 3307710093243745104L; + + private final Pattern topicPattern; + private final RegexSubscriptionMode subscriptionMode; + private final String namespace; + + public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode subscriptionMode) { + this.topicPattern = topicPattern; + this.subscriptionMode = subscriptionMode; + + // Extract the namespace from topic pattern regex. + // If no namespace provided in the regex, we would directly use "default" as the namespace. + TopicName destination = TopicName.get(topicPattern.toString()); + NamespaceName namespaceName = destination.getNamespaceObject(); + this.namespace = namespaceName.toString(); + } + + @Override + public Set<TopicPartition> getSubscribedTopicPartitions( + PulsarAdmin pulsarAdmin, + RangeGenerator rangeGenerator, + int parallelism, + SourceConfiguration sourceConfiguration) { + try { + return pulsarAdmin + .namespaces() + .getTopics(namespace) + .parallelStream() + .filter( + topic -> { + TopicName topicName = TopicName.get(topic); + // Filter the topic persistence. + switch (subscriptionMode) { + case PersistentOnly: + return topicName.isPersistent(); + case NonPersistentOnly: + return !topicName.isPersistent(); + default: + // RegexSubscriptionMode.AllTopics + return true; + } + }) Review comment: Could you please extract such large lambdas to a method & potentially use a method lambda instead? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/DivideRangeGenerator.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic.range; + +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; + +/** + * This range generator would divide the range by the flink source parallelism. It would be the + * default implementation for {@link SubscriptionType#Key_Shared} subscription. + */ +public class DivideRangeGenerator implements RangeGenerator { + private static final long serialVersionUID = -7292650922683609268L; + + @Override + public List<TopicRange> range(TopicMetadata metadata, int parallelism) { + List<TopicRange> results = new ArrayList<>(parallelism); + + int rangeSize = RANGE_SIZE / parallelism; + int startRange = 0; + + for (int i = 0; i < parallelism - 1; i++) { + int nextStartRange = startRange + rangeSize; + results.add(new TopicRange(startRange, nextStartRange - 1)); + startRange = nextStartRange; + } + results.add(new TopicRange(startRange, MAX_RANGE)); Review comment: With this algorithm, we get a comparably huge large partition. Consider a parallelism of 241. Then rangeSize=65536/241=271 (.93...) So each TopicRange has the size of 271, except the last one that gets the remaining 225 keys and ends up with 466 keys; so almost double the size. A fairer approach is to use not precompute `rangeSize` and use ``` for (int i = 1; i < parallelism; i++) { int nextStartRange = i * RANGE_SIZE / parallelism; results.add(new TopicRange(startRange, nextStartRange - 1)); startRange = nextStartRange; } ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaBase.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; + +/** + * The base deserialization schema implementation, user can implement their own {@code + * PulsarDeserializationSchema} by extends this class. Or you can just given custom {@link + * PulsarSchemaFactory}, {@link PulsarMessageDeserializer} and {@link TypeInformation}. + * + * @param <M> The message type for pulsar. + * @param <T> The result type (source type) for flink. + */ +@PublicEvolving +public class PulsarDeserializationSchemaBase<M, T> implements PulsarDeserializationSchema<M, T> { + private static final long serialVersionUID = 7278149192690933421L; + + private final PulsarSchemaFactory<M> schemaFactory; + + private final PulsarMessageDeserializer<M, T> messageDeserializer; + + private final TypeInformation<T> typeInformation; + + public PulsarDeserializationSchemaBase( + PulsarSchemaFactory<M> schemaFactory, + PulsarMessageDeserializer<M, T> pulsarDeserializer, + TypeInformation<T> typeInformation) { + this.schemaFactory = schemaFactory; + this.messageDeserializer = pulsarDeserializer; + this.typeInformation = typeInformation; + } + + @Override + public void deserialize(Message<M> message, Collector<T> out) throws Exception { + Iterable<T> iterable = messageDeserializer.deserialize(message); + // This null asserting is required for avoiding NP. + if (iterable != null) { Review comment: It should never be null. It would be better to get an NPE here. (`PulsarMessageDeserializer#deserialize` is not annotated as `@Nullable`). ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * Pulsar's FetcherManager implementation for ordered consuming. This class is needed to help + * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link + * PulsarOrderedPartitionSplitReader}. + * + * @param <T> The message type for pulsar decoded message. + */ +public class PulsarOrderedFetcherManager<T> extends PulsarFetcherManagerBase<T> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedFetcherManager.class); + + public PulsarOrderedFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> elementsQueue, + Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + public void acknowledgeMessages(Map<TopicPartition, MessageId> cursorsToCommit) { + LOG.debug("Acknowledge messages {}", cursorsToCommit); + cursorsToCommit.forEach( + (partition, messageId) -> { + SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher = + getOrCreateFetcher(partition.toString()); + enqueueAcknowledgeTask(fetcher, partition, messageId); + }); + } + + private void enqueueAcknowledgeTask( Review comment: You are not really enqueuing a task here. Maybe just `acknowledge`? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import java.util.function.Supplier; + +import static org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY; +import static org.apache.flink.connector.pulsar.common.utils.PulsarClosingUtils.closeWithTimeout; + +/** + * This factory class is used for creating different types of source reader for different + * subscription type. + * + * <ol> + * <li>Failover, Exclusive: We would create PulsarOrderedSourceReader. + * <li>Shared, Key_Shared: We would create PulsarUnorderedSourceReader. + * </ol> + */ +public final class PulsarSourceReaderFactory { + + private PulsarSourceReaderFactory() { + // No public constructor. + } + + public static <IN, OUT> SourceReader<OUT, PulsarPartitionSplit> create( + SourceReaderContext readerContext, + PulsarDeserializationSchema<IN, OUT> deserializationSchema, + PulsarClient pulsarClient, + PulsarAdmin pulsarAdmin, + Configuration configuration, + SourceConfiguration sourceConfiguration, + ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer) { + // Create a message queue with the predefined source option. + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue = + new FutureCompletingBlockingQueue<>( + configuration.getInteger(ELEMENT_QUEUE_CAPACITY)); + + // Create the required close component for pulsar source reader. + Closer splitCloser = Closer.create(); + + splitCloser.register( + () -> + closeWithTimeout( + "PulsarClient", + pulsarClient::close, + sourceConfiguration.getCloseTimeoutMs())); + splitCloser.register( + () -> + closeWithTimeout( + "PulsarAdmin", + pulsarAdmin::close, + sourceConfiguration.getCloseTimeoutMs())); Review comment: Why do we need to add this here? Can't we just close them in the reader (it's owned by the reader after all). Since Closer is Closeable, you could have a Closer in the reader if you want to preserve order and get the free suppression chaining. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.source; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * The source reader for pulsar subscription Failover and Exclusive, which consumes the ordered + * messages. + */ +public class PulsarOrderedSourceReader<IN, OUT> extends PulsarSourceReaderBase<OUT> { Review comment: `@Internal` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import java.util.function.Supplier; + +import static org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY; +import static org.apache.flink.connector.pulsar.common.utils.PulsarClosingUtils.closeWithTimeout; + +/** + * This factory class is used for creating different types of source reader for different + * subscription type. + * + * <ol> + * <li>Failover, Exclusive: We would create PulsarOrderedSourceReader. + * <li>Shared, Key_Shared: We would create PulsarUnorderedSourceReader. + * </ol> + */ +public final class PulsarSourceReaderFactory { + + private PulsarSourceReaderFactory() { + // No public constructor. + } + + public static <IN, OUT> SourceReader<OUT, PulsarPartitionSplit> create( + SourceReaderContext readerContext, + PulsarDeserializationSchema<IN, OUT> deserializationSchema, + PulsarClient pulsarClient, + PulsarAdmin pulsarAdmin, + Configuration configuration, + SourceConfiguration sourceConfiguration, + ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer) { + // Create a message queue with the predefined source option. + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue = + new FutureCompletingBlockingQueue<>( + configuration.getInteger(ELEMENT_QUEUE_CAPACITY)); + + // Create the required close component for pulsar source reader. + Closer splitCloser = Closer.create(); + + splitCloser.register( + () -> + closeWithTimeout( + "PulsarClient", + pulsarClient::close, + sourceConfiguration.getCloseTimeoutMs())); + splitCloser.register( + () -> + closeWithTimeout( + "PulsarAdmin", + pulsarAdmin::close, + sourceConfiguration.getCloseTimeoutMs())); + + // Create different pulsar source reader by subscription type. + SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType(); + if (subscriptionType == SubscriptionType.Failover + || subscriptionType == SubscriptionType.Exclusive) { + // Create a ordered split reader supplier. + Supplier<PulsarOrderedPartitionSplitReader<IN, OUT>> splitReaderSupplier = + () -> { + PulsarOrderedPartitionSplitReader<IN, OUT> reader = + new PulsarOrderedPartitionSplitReader<>( + pulsarClient, + pulsarAdmin, + configuration, + sourceConfiguration, + consumerConfigurationCustomizer, + deserializationSchema); + + splitCloser.register(reader); + return reader; + }; + + return new PulsarOrderedSourceReader<>( + elementsQueue, + splitReaderSupplier, + configuration, + readerContext, + sourceConfiguration, + splitCloser); + } else if (subscriptionType == SubscriptionType.Shared + || subscriptionType == SubscriptionType.Key_Shared) { + TransactionCoordinatorClient coordinatorClient = + ((PulsarClientImpl) pulsarClient).getTcClient(); + if (coordinatorClient == null && !sourceConfiguration.isEnableAutoCommitCursor()) { + throw new PulsarRuntimeException("Transaction is required but didn't enabled"); + } + + Supplier<PulsarUnorderedPartitionSplitReader<IN, OUT>> splitReaderSupplier = + () -> { + PulsarUnorderedPartitionSplitReader<IN, OUT> reader = + new PulsarUnorderedPartitionSplitReader<>( + pulsarClient, + pulsarAdmin, + configuration, + sourceConfiguration, + consumerConfigurationCustomizer, + deserializationSchema, + coordinatorClient); + + splitCloser.register(reader); Review comment: The `PulsarUnorderedPartitionSplitReader` should already be closed by the `SplitFetcherManager`. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/exception/PulsarExceptionUtils.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.exception; + +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Util class for pulsar checked exceptions. Sneaky throw {@link PulsarAdminException} and {@link + * PulsarClientException}. + */ +public final class PulsarExceptionUtils { Review comment: Btw `PulsarClientExceptions` could be simply wrapped in `UncheckedIOException`. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.common.naming.TopicName; + +/** util for topic name. */ +public final class TopicNameUtils { + + private TopicNameUtils() { + // No public constructor. + } + + /** + * Ensure the given topic name should be a topic without partition information. {@link + * TopicName} has a internal cache for the query result, so you can call this method multiple + * times. + */ + public static String topicName(String topic) { + return TopicName.get(topic).getPartitionedTopicName(); + } + + /** + * Create a topic name with partition information. {@link TopicName} has a internal cache for Review comment: Same comment as above. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.common.naming.TopicName; + +/** util for topic name. */ +public final class TopicNameUtils { Review comment: Is this `@PublicEvolving`? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Metadata of a partitioned topic. */ Review comment: This is a bit confusing. By this comment I'd expect `TopicMetadata`. In fact, both classes are so similar, I have a hard time to understand why we need both... Could you check if you can merge these classes and if not make sure that the JavaDoc of these potentially public classes make it clear what their purpose is? Is this public API? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. + * + * <p>Please visit <a + * href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar + * documentation</a> for detailed schema type clarify. + */ + @SuppressWarnings("unchecked") + private static <T> TypeInformation<T> createTypeInformation( + PulsarSchemaFactory<T> schemaFactory) { + Schema<T> schema = schemaFactory.create(); + // SchemaInfo contains all the required information for deserializing. + SchemaInfo schemaInfo = schema.getSchemaInfo(); + SchemaType schemaType = schemaInfo.getType(); + + TypeInformation<?> information = null; + switch (schemaType) { + case STRING: + information = Types.STRING; + break; + case BOOLEAN: + information = Types.BOOLEAN; + break; + case INT8: + information = Types.BYTE; + break; + case INT16: + information = Types.SHORT; + break; + case INT32: + information = Types.INT; + break; + case INT64: + information = Types.LONG; + break; + case FLOAT: + information = Types.FLOAT; + break; + case DOUBLE: + information = Types.DOUBLE; + break; + case DATE: + // Since pulsar use this type for both util.Date and sql.Date, + // we just choose util.Date here. + information = BasicTypeInfo.DATE_TYPE_INFO; + break; + case TIME: + information = Types.SQL_TIME; + break; + case TIMESTAMP: + information = Types.SQL_TIMESTAMP; + break; + case INSTANT: + information = Types.INSTANT; + break; + case LOCAL_DATE: + information = Types.LOCAL_DATE; + break; + case LOCAL_TIME: + information = Types.LOCAL_TIME; + break; + case LOCAL_DATE_TIME: + information = Types.LOCAL_DATE_TIME; + break; + case BYTES: + information = Types.PRIMITIVE_ARRAY(Types.BYTE); + break; + } + + if (information == null) { + // Try to extract the type info by using flink provided utils. + try { + // Support protobuf class after flink support it natively. + information = + TypeExtractor.createTypeInfo( + PulsarSchemaFactory.class, schemaFactory.getClass(), 0, null, null); + } catch (Exception e) { + // Nothing to do here, this exception is accepted and don't need. + } + } + + if (information != null) { + return (TypeInformation<T>) information; + } else { + String schemaInfoStr = new String(schemaInfo.getSchema(), StandardCharsets.UTF_8); + throw new FlinkRuntimeException( + "Unsupported pulsar schema, please provide the related TypeInformation. The schema info was: \n" + + schemaInfoStr); + } Review comment: Good solution! ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaInitializationContext.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * Convert the {@link SourceReaderContext} into a {@link + * DeserializationSchema.InitializationContext}, we would use a pulsar named metric group for this + * content. + */ +@Internal +public class PulsarSchemaInitializationContext Review comment: `PulsarDeserializationSchemaInitializationContext` - we need another context for serialization later. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaWrapper.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; + +import org.apache.pulsar.client.api.Schema; + +import java.util.Collections; + +/** + * A {@link PulsarDeserializationSchema} implementation which based on the given flink's {@link + * DeserializationSchema}. We would consume the message as byte array from pulsar and deserialize it + * by using flink serialization logic. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarDeserializationSchemaWrapper<T> extends PulsarDeserializationSchemaBase<byte[], T> { + private static final long serialVersionUID = -630646912412751300L; + + private final DeserializationSchema<T> deserializationSchema; + + public PulsarDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { + super( + () -> Schema.BYTES, + createDeserializer(deserializationSchema), + deserializationSchema.getProducedType()); + this.deserializationSchema = deserializationSchema; + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + // Initialize it for some custom logic. + deserializationSchema.open(context); + } + + private static <T> PulsarMessageDeserializer<byte[], T> createDeserializer( + DeserializationSchema<T> deserializationSchema) { + return message -> { + if (message != null) { Review comment: When would `message==null`? We should add a `Precondition#checkNotNull` on call-site of this lambda. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; + +/** PulsarSubscriber abstract class to simplify Pulsar admin related operations. */ +public abstract class BasePulsarSubscriber implements PulsarSubscriber { + private static final long serialVersionUID = 2053021503331058888L; + + protected TopicMetadata queryTopicMetadata(PulsarAdmin pulsarAdmin, String topicName) { + // Drop the complete topic name for a clean partitioned topic name. + String completeTopicName = TopicNameUtils.topicName(topicName); + try { + PartitionedTopicMetadata metadata = + pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName); + return new TopicMetadata(topicName, metadata.partitions); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == 404) { + // Return null for skipping the topic metadata query. + return null; + } else { + // This method would cause the failure for subscriber. + throw new PulsarRuntimeException(e); + } + } + } + + protected List<TopicPartition> createPartitions( Review comment: nit: This sounded a bit like you are creating Pulsar partitions here. How about `toTopicPartitions`? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.common.naming.TopicName; + +/** util for topic name. */ +public final class TopicNameUtils { + + private TopicNameUtils() { + // No public constructor. + } + + /** + * Ensure the given topic name should be a topic without partition information. {@link Review comment: I'd leave out everything about `TopicName` here. It's an implementation of this method and you mention an implementation detail of `TopicName`. I also don't see a need to encourage a user to call this method multiple times. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic.range; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; + +import java.io.Serializable; +import java.util.List; + +/** A generator for generating the {@link TopicRange} for given topic. */ Review comment: I think this class deserves more information. Why would you implement it? When would you choose one of the two provided implementations? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; + +/** PulsarSubscriber abstract class to simplify Pulsar admin related operations. */ +public abstract class BasePulsarSubscriber implements PulsarSubscriber { + private static final long serialVersionUID = 2053021503331058888L; + + protected TopicMetadata queryTopicMetadata(PulsarAdmin pulsarAdmin, String topicName) { + // Drop the complete topic name for a clean partitioned topic name. + String completeTopicName = TopicNameUtils.topicName(topicName); + try { + PartitionedTopicMetadata metadata = + pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName); + return new TopicMetadata(topicName, metadata.partitions); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == 404) { + // Return null for skipping the topic metadata query. + return null; + } else { + // This method would cause the failure for subscriber. + throw new PulsarRuntimeException(e); + } + } + } + + protected List<TopicPartition> createPartitions( + TopicMetadata metadata, + int parallelism, + SourceConfiguration sourceConfiguration, + RangeGenerator rangeGenerator) { + if (!metadata.isPartitioned()) { + // For non-partitioned topic. + return rangeGenerator.range(metadata, parallelism).stream() + .map(range -> new TopicPartition(metadata.getName(), -1, range)) + .collect(toList()); + } else { + SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType(); + if (subscriptionType == SubscriptionType.Exclusive + || subscriptionType == SubscriptionType.Failover) { + // We should generate the topic partition for each partitions. + return IntStream.range(0, metadata.getPartitionSize()) + .boxed() + .flatMap( + partitionId -> + rangeGenerator.range(metadata, parallelism).stream() + .map( + range -> + new TopicPartition( + metadata.getName(), + partitionId, + range))) + .collect(toList()); + } else if (subscriptionType == SubscriptionType.Shared + || subscriptionType == SubscriptionType.Key_Shared) { + // We should generate only one topic partition for all topic partitions. + return rangeGenerator.range(metadata, parallelism).stream() + .map(range -> new TopicPartition(metadata.getName(), -1, range)) + .collect(toList()); + } else { + throw new UnsupportedOperationException( + "This subscription type is not " + + subscriptionType + + " supported currently."); Review comment: ```suggestion throw new UnsupportedOperationException( "This subscription type " + subscriptionType + " is not supported currently."); ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaFactory.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.pulsar.client.api.Schema; + +import java.io.Serializable; + +/** + * Since pulsar's default schema implementation is not serializable, we introduce the interface for + * allowing user defining a lambda based schema factory. + * + * <p>The following example shows a lambda based schema factory which returns a <code>Schema.STRING + * </code> + * + * <pre>{@code () -> Schema.STRING}</pre> + * + * @param <T> The message type. + */ +@PublicEvolving +@FunctionalInterface +public interface PulsarSchemaFactory<T> extends Serializable { Review comment: Another approach (that would fit better here imho) is to use a `SerializableSchema` wrapper. See https://github.com/AHeise/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 You can use it as an internal class that is used to wrap any `Schema` reference in the factory methods. ``` static <F> PulsarDeserializationSchema<F, F> pulsarSchema( Schema<F> schema) { return new PulsarSchemaWrapper<>(new SerializablePulsarSchema(schema)); // or even better: wrap it in ctor while assigning the field } ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Collections.singletonList; + +/** + * Common fetcher manager abstraction for both ordered & unordered message. + * + * @param <T> The decoded message type for flink. + */ +public abstract class PulsarFetcherManagerBase<T> + extends SingleThreadFetcherManager<PulsarMessage<T>, PulsarPartitionSplit> { + + private final Map<String, Integer> splitFetcherMapping = new HashMap<>(); + private final Map<Integer, Boolean> fetcherStatus = new HashMap<>(); + + /** + * Creates a new SplitFetcherManager with multiple I/O threads. + * + * @param elementsQueue The queue that is used to hand over data from the I/O thread (the + * fetchers) to the reader (which emits the records and book-keeps the state. This must be + * the same queue instance that is also passed to the {@link SourceReaderBase}. + * @param splitReaderSupplier The factory for the split reader that connects to the source + */ + protected PulsarFetcherManagerBase( + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> elementsQueue, + Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + /** + * Override this method for supporting multiple thread fetching, one fetcher thread for one + * split. + */ + @Override + public void addSplits(List<PulsarPartitionSplit> splitsToAdd) { + for (PulsarPartitionSplit split : splitsToAdd) { + SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher = + getOrCreateFetcher(split.splitId()); + fetcher.addSplits(singletonList(split)); + // This method could be executed multiple times. + startFetcher(fetcher); + } + } + + @Override + protected void startFetcher(SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher) { + Boolean status = fetcherStatus.get(fetcher.fetcherId()); + if (status == null || !status) { + fetcherStatus.put(fetcher.fetcherId(), true); + super.startFetcher(fetcher); + } + } + + protected synchronized SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> getOrCreateFetcher( Review comment: This is all single-threaded, we can safely drop `synchronized`. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. + * + * <p>Please visit <a + * href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar + * documentation</a> for detailed schema type clarify. + */ + @SuppressWarnings("unchecked") + private static <T> TypeInformation<T> createTypeInformation( + PulsarSchemaFactory<T> schemaFactory) { + Schema<T> schema = schemaFactory.create(); + // SchemaInfo contains all the required information for deserializing. + SchemaInfo schemaInfo = schema.getSchemaInfo(); + SchemaType schemaType = schemaInfo.getType(); + + TypeInformation<?> information = null; + switch (schemaType) { + case STRING: + information = Types.STRING; + break; + case BOOLEAN: + information = Types.BOOLEAN; + break; + case INT8: + information = Types.BYTE; + break; + case INT16: + information = Types.SHORT; + break; + case INT32: + information = Types.INT; + break; + case INT64: + information = Types.LONG; + break; + case FLOAT: + information = Types.FLOAT; + break; + case DOUBLE: + information = Types.DOUBLE; + break; + case DATE: + // Since pulsar use this type for both util.Date and sql.Date, + // we just choose util.Date here. + information = BasicTypeInfo.DATE_TYPE_INFO; + break; + case TIME: + information = Types.SQL_TIME; + break; + case TIMESTAMP: + information = Types.SQL_TIMESTAMP; + break; + case INSTANT: + information = Types.INSTANT; + break; + case LOCAL_DATE: + information = Types.LOCAL_DATE; + break; + case LOCAL_TIME: + information = Types.LOCAL_TIME; + break; + case LOCAL_DATE_TIME: + information = Types.LOCAL_DATE_TIME; + break; + case BYTES: + information = Types.PRIMITIVE_ARRAY(Types.BYTE); + break; + } + + if (information == null) { + // Try to extract the type info by using flink provided utils. + try { + // Support protobuf class after flink support it natively. Review comment: Should we create a follow-up ticket here? protobuf support is planned in second half of this year. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. Review comment: Is this a general limitation or are we just cutting the scope here for now and enhance that in the future? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/DivideRangeGenerator.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic.range; + +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; + +/** + * This range generator would divide the range by the flink source parallelism. It would be the + * default implementation for {@link SubscriptionType#Key_Shared} subscription. + */ +public class DivideRangeGenerator implements RangeGenerator { Review comment: `UniformRangeGenerator` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Collector; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; + +import java.io.Serializable; + +/** + * A schema bridge for deserializing the pulsar's <code>Message<?></code> into a flink managed + * instance. We support both the pulsar's self managed schema and flink managed schema. + * + * @param <M> The decode message type from pulsar client, which would create a message {@link + * SchemaInfo} from this type. + * @param <T> The output message type for sinking to downstream flink operator. + */ +@PublicEvolving +public interface PulsarDeserializationSchema<M, T> extends Serializable, ResultTypeQueryable<T> { Review comment: The whole deserializer package is well-crafted except for some small nits. However, I'd like to understand if there is an obvious benefit in using an explicit message type. Let me first explain how we do it in the other connectors and contrast that with your approach. Pretty much all connectors to message systems use a fixed Schema.BYTES and convert it to user-defined types with `DeserializationSchema` in the same way as you do it in `PulsarDeserializationSchemaWrapper`. Furthermore, when the system provides its own deserialization framework, we add additional adapter for that like `PulsarSchemaWrapper` but we'd put the deserialization logic inside the wrapper. So in Pulsar's case, we would call `Schema#decode` inside the `PulsarSchemaWrapper` that always receives a `Message<byte[]>`. The advantage of that approach is that we can omit the input type parameter (IN or M) - it's always byte[] from Flink's point of view. That's easier for users and will yield the same result. The drawback is that we apply the deserialization outside of the source client. In most cases, the clients offer a raw view, such that there is no performance penalty at all. However, in few cases, the client may optimize it if we push down the schema to it (pruning some columns = reducing the byte[] while requesting the data). Now what I have seen of Pulsar so far makes me believe that there is no real benefit of pushing the schema inside the client but it makes the code more complicated. It would be nice to know if there is a specific reason for your different approach. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * Pulsar's FetcherManager implementation for ordered consuming. This class is needed to help + * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link + * PulsarOrderedPartitionSplitReader}. + * + * @param <T> The message type for pulsar decoded message. + */ +public class PulsarOrderedFetcherManager<T> extends PulsarFetcherManagerBase<T> { Review comment: `@Internal` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. + * + * <p>Please visit <a + * href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar + * documentation</a> for detailed schema type clarify. + */ + @SuppressWarnings("unchecked") + private static <T> TypeInformation<T> createTypeInformation( + PulsarSchemaFactory<T> schemaFactory) { + Schema<T> schema = schemaFactory.create(); + // SchemaInfo contains all the required information for deserializing. + SchemaInfo schemaInfo = schema.getSchemaInfo(); + SchemaType schemaType = schemaInfo.getType(); + + TypeInformation<?> information = null; + switch (schemaType) { Review comment: This switch should be an EnumMap lookup. The EnumMap should be statically created in this class. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; + +import org.apache.pulsar.client.api.Consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Pulsar's FetcherManager implementation for unordered consuming. This class is needed to help + * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link + * PulsarUnorderedPartitionSplitReader}. + * + * @param <T> The message type for pulsar decoded message. + */ +public class PulsarUnorderedFetcherManager<T> extends PulsarFetcherManagerBase<T> { + + public PulsarUnorderedFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> elementsQueue, + Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + public List<PulsarPartitionSplit> snapshotState(long checkpointId) { + List<PulsarPartitionSplit> states = new ArrayList<>(fetchers.size()); + for (Map.Entry<Integer, SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit>> entry : + fetchers.entrySet()) { + SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher = entry.getValue(); + PulsarUnorderedPartitionSplitReader<?, T> splitReader = + (PulsarUnorderedPartitionSplitReader<?, T>) fetcher.getSplitReader(); + + PulsarPartitionSplitState state = splitReader.snapshotState(checkpointId); + states.add(state); + } + + return states; + } Review comment: ```suggestion public List<PulsarPartitionSplit> snapshotState(long checkpointId) { return fetchers.values().stream() .map(SplitFetcher::getSplitReader) .map(splitReader -> snapshotReader(checkpointId, splitReader)) .collect(Collectors.toList()); } private PulsarPartitionSplitState snapshotReader( long checkpointId, SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { return ((PulsarUnorderedPartitionSplitReader<?, T>) splitReader) .snapshotState(checkpointId); } ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import java.util.function.Supplier; + +import static org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY; +import static org.apache.flink.connector.pulsar.common.utils.PulsarClosingUtils.closeWithTimeout; + +/** + * This factory class is used for creating different types of source reader for different + * subscription type. + * + * <ol> + * <li>Failover, Exclusive: We would create PulsarOrderedSourceReader. + * <li>Shared, Key_Shared: We would create PulsarUnorderedSourceReader. + * </ol> + */ +public final class PulsarSourceReaderFactory { Review comment: `@Internal` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; + +import org.apache.pulsar.client.api.Consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Pulsar's FetcherManager implementation for unordered consuming. This class is needed to help + * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link + * PulsarUnorderedPartitionSplitReader}. + * + * @param <T> The message type for pulsar decoded message. + */ +public class PulsarUnorderedFetcherManager<T> extends PulsarFetcherManagerBase<T> { Review comment: `@Internal` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessage.java ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.message; + +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +/** + * The message instance that contains the required information which would be used for committing + * the consuming status. + */ +public class PulsarMessage<T> { Review comment: `@Internal` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/CollectorSupplier.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.message; + +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.util.Collector; + +import org.apache.pulsar.client.api.Message; + +/** + * This collector supplier is providing the {@link Collector} for accepting the deserialized + * messages from pulsar schema. + * + * @param <T> The deserialized pulsar message type, aka the source message type. + */ +public class CollectorSupplier<T> { Review comment: This looks very costly. We create a new collector per record? Why isn't this class directly a `Collector` then you create it only once per batch and still have all information. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.fetcher; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Collections.singletonList; + +/** + * Common fetcher manager abstraction for both ordered & unordered message. + * + * @param <T> The decoded message type for flink. + */ +public abstract class PulsarFetcherManagerBase<T> + extends SingleThreadFetcherManager<PulsarMessage<T>, PulsarPartitionSplit> { + + private final Map<String, Integer> splitFetcherMapping = new HashMap<>(); + private final Map<Integer, Boolean> fetcherStatus = new HashMap<>(); + + /** + * Creates a new SplitFetcherManager with multiple I/O threads. + * + * @param elementsQueue The queue that is used to hand over data from the I/O thread (the + * fetchers) to the reader (which emits the records and book-keeps the state. This must be + * the same queue instance that is also passed to the {@link SourceReaderBase}. + * @param splitReaderSupplier The factory for the split reader that connects to the source + */ + protected PulsarFetcherManagerBase( + FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> elementsQueue, + Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } + + /** + * Override this method for supporting multiple thread fetching, one fetcher thread for one + * split. + */ + @Override + public void addSplits(List<PulsarPartitionSplit> splitsToAdd) { + for (PulsarPartitionSplit split : splitsToAdd) { + SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher = + getOrCreateFetcher(split.splitId()); + fetcher.addSplits(singletonList(split)); + // This method could be executed multiple times. + startFetcher(fetcher); + } + } + + @Override + protected void startFetcher(SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher) { + Boolean status = fetcherStatus.get(fetcher.fetcherId()); + if (status == null || !status) { + fetcherStatus.put(fetcher.fetcherId(), true); + super.startFetcher(fetcher); + } Review comment: ```suggestion if (fetcherStatus.get(fetcher.fetcherId()) != Boolean.TRUE) { fetcherStatus.put(fetcher.fetcherId(), Boolean.TRUE); super.startFetcher(fetcher); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org