[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996621#comment-15996621 ]
ASF GitHub Bot commented on FLINK-4022: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114757153 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all partition discoverers. + * + * <p>This partition discoverer base class implements the logic around bookkeeping + * discovered partitions, and using the information to determine whether or not there + * are new partitions that the consumer subtask should subscribe to. + * + * <p>Subclass implementations should simply implement the logic of using the version-specific + * Kafka clients to fetch topic and partition metadata. + * + * <p>Since Kafka clients are generally not thread-safe, partition discoverers should + * not be concurrently accessed. The only exception for this would be the {@link #wakeup()} + * call, which allows the discoverer to be interrupted during a {@link #discoverPartitions()} call. + */ +public abstract class AbstractPartitionDiscoverer { + + /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */ + private final KafkaTopicsDescriptor topicsDescriptor; + + /** Index of the consumer subtask that this partition discoverer belongs to. */ + private final int indexOfThisSubtask; + + /** The total number of consumer subtasks. */ + private final int numParallelSubtasks; + + /** Flag to determine whether or not the discoverer is closed. */ + private volatile boolean closed = true; + + /** + * Flag to determine whether or not the discoverer had been woken up. + * When set to {@code true}, {@link #discoverPartitions()} would be interrupted as early as possible. + * Once interrupted, the flag is reset. + */ + private volatile boolean wakeup; + + /** + * Map of topics to they're largest discovered partition id seen by this subtask. + * This state may be updated whenever {@link AbstractPartitionDiscoverer#discoverPartitions()} or + * {@link AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)} is called. + * + * This is used to remove old partitions from the fetched partition lists. It is sufficient + * to keep track of only the largest partition id because Kafka partition numbers are only + * allowed to be increased and has incremental ids. + */ + private final Map<String, Integer> topicsToLargestDiscoveredPartitionId; + + public AbstractPartitionDiscoverer( + KafkaTopicsDescriptor topicsDescriptor, + int indexOfThisSubtask, + int numParallelSubtasks) { + + this.topicsDescriptor = checkNotNull(topicsDescriptor); + this.indexOfThisSubtask = indexOfThisSubtask; + this.numParallelSubtasks = numParallelSubtasks; + this.topicsToLargestDiscoveredPartitionId = new HashMap<>(); + } + + /** + * Opens the partition discoverer, initializing all required Kafka connections. + * + * <p>NOTE: thread-safety is not guaranteed. + */ + public void open() throws Exception { + closed = false; + initializeConnections(); + } + + /** + * Closes the partition discoverer, cleaning up all Kafka connections. + * + * <p>NOTE: thread-safety is not guaranteed. + */ + public void close() throws Exception { + closed = true; + closeConnections(); + } + + /** + * Interrupt an in-progress discovery attempt by throwing a {@link WakeupException}. + * If no attempt is in progress, the immediate next attempt will throw a {@link WakeupException}. + * + * <p>This method can be called concurrently from a different thread. + */ + public void wakeup() { + wakeup = true; + wakeupConnections(); + } + + /** + * Execute a partition discovery attempt for this subtask. + * This method lets the partition discoverer update what partitions it has discovered so far. + * + * @return List of discovered new partitions that this subtask should subscribe to. + */ + public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { + if (!closed && !wakeup) { + try { + List<KafkaTopicPartition> newDiscoveredPartitions; + + // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic patern --- End diff -- Typo "patern" > Partition discovery / regex topic subscription for the Kafka consumer > --------------------------------------------------------------------- > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)