[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996839#comment-15996839 ]
ASF GitHub Bot commented on FLINK-4022: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114793528 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } - // we need only do work, if we actually have partitions assigned - if (!subscribedPartitionsToStartOffsets.isEmpty()) { - - // create the fetcher that will communicate with the Kafka brokers - final AbstractFetcher<T, ?> fetcher = createFetcher( - sourceContext, - subscribedPartitionsToStartOffsets, - periodicWatermarkAssigner, - punctuatedWatermarkAssigner, - (StreamingRuntimeContext) getRuntimeContext(), - offsetCommitMode); - - // publish the reference, for snapshot-, commit-, and cancel calls - // IMPORTANT: We can only do that now, because only now will calls to - // the fetchers 'snapshotCurrentState()' method return at least - // the restored offsets - this.kafkaFetcher = fetcher; - if (!running) { - return; - } - - // (3) run the fetcher' main work method - fetcher.runFetchLoop(); + this.runThread = Thread.currentThread(); + + // mark the subtask as temporarily idle if there are no initial seed partitions; + // once this subtask discovers some partitions and starts collecting records, the subtask's + // status will automatically be triggered back to be active. + if (subscribedPartitionsToStartOffsets.isEmpty()) { + sourceContext.markAsTemporarilyIdle(); } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - // wait until this is canceled - final Object waitLock = new Object(); + // create the fetcher that will communicate with the Kafka brokers + final AbstractFetcher<T, ?> fetcher = createFetcher( + sourceContext, + subscribedPartitionsToStartOffsets, + periodicWatermarkAssigner, + punctuatedWatermarkAssigner, + (StreamingRuntimeContext) getRuntimeContext(), + offsetCommitMode); + + // publish the reference, for snapshot-, commit-, and cancel calls + // IMPORTANT: We can only do that now, because only now will calls to + // the fetchers 'snapshotCurrentState()' method return at least + // the restored offsets + this.kafkaFetcher = fetcher; + + if (!running) { + return; + } + + // depending on whether we were restored with the current state version (1.3), + // remaining logic branches off into 2 paths: + // 1) New state - main fetcher loop executed as separate thread, with this + // thread running the partition discovery loop + // 2) Old state - partition discovery is disabled, simply going into the main fetcher loop + + if (!restoredFromOldState) { + final AtomicReference<Exception> fetcherErrorRef = new AtomicReference<>(); + Thread fetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + // run the fetcher' main work method + kafkaFetcher.runFetchLoop(); --- End diff -- Hmm, there actually isn't any good reason that this is required, as I can think of. one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher actually had always been emitting elements from different threads. So I didn't really assume which thread (main or separate) runs the fetcher loop and which one runs the discovery loop. but I think it's also ok to swap this here > Partition discovery / regex topic subscription for the Kafka consumer > --------------------------------------------------------------------- > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Affects Versions: 1.0.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)