[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996839#comment-15996839
 ] 

ASF GitHub Bot commented on FLINK-4022:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114793528
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) 
throws Exception {
                        throw new Exception("The partitions were not set for 
the consumer");
                }
     
    -           // we need only do work, if we actually have partitions assigned
    -           if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -                   // create the fetcher that will communicate with the 
Kafka brokers
    -                   final AbstractFetcher<T, ?> fetcher = createFetcher(
    -                                   sourceContext,
    -                                   subscribedPartitionsToStartOffsets,
    -                                   periodicWatermarkAssigner,
    -                                   punctuatedWatermarkAssigner,
    -                                   (StreamingRuntimeContext) 
getRuntimeContext(),
    -                                   offsetCommitMode);
    -
    -                   // publish the reference, for snapshot-, commit-, and 
cancel calls
    -                   // IMPORTANT: We can only do that now, because only now 
will calls to
    -                   //            the fetchers 'snapshotCurrentState()' 
method return at least
    -                   //            the restored offsets
    -                   this.kafkaFetcher = fetcher;
    -                   if (!running) {
    -                           return;
    -                   }
    -                   
    -                   // (3) run the fetcher' main work method
    -                   fetcher.runFetchLoop();
    +           this.runThread = Thread.currentThread();
    +
    +           // mark the subtask as temporarily idle if there are no initial 
seed partitions;
    +           // once this subtask discovers some partitions and starts 
collecting records, the subtask's
    +           // status will automatically be triggered back to be active.
    +           if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +                   sourceContext.markAsTemporarilyIdle();
                }
    -           else {
    -                   // this source never completes, so emit a 
Long.MAX_VALUE watermark
    -                   // to not block watermark forwarding
    -                   sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
     
    -                   // wait until this is canceled
    -                   final Object waitLock = new Object();
    +           // create the fetcher that will communicate with the Kafka 
brokers
    +           final AbstractFetcher<T, ?> fetcher = createFetcher(
    +                           sourceContext,
    +                           subscribedPartitionsToStartOffsets,
    +                           periodicWatermarkAssigner,
    +                           punctuatedWatermarkAssigner,
    +                           (StreamingRuntimeContext) getRuntimeContext(),
    +                           offsetCommitMode);
    +
    +           // publish the reference, for snapshot-, commit-, and cancel 
calls
    +           // IMPORTANT: We can only do that now, because only now will 
calls to
    +           //            the fetchers 'snapshotCurrentState()' method 
return at least
    +           //            the restored offsets
    +           this.kafkaFetcher = fetcher;
    +
    +           if (!running) {
    +                   return;
    +           }
    +
    +           // depending on whether we were restored with the current state 
version (1.3),
    +           // remaining logic branches off into 2 paths:
    +           //  1) New state - main fetcher loop executed as separate 
thread, with this
    +           //                 thread running the partition discovery loop
    +           //  2) Old state - partition discovery is disabled, simply 
going into the main fetcher loop
    +
    +           if (!restoredFromOldState) {
    +                   final AtomicReference<Exception> fetcherErrorRef = new 
AtomicReference<>();
    +                   Thread fetcherThread = new Thread(new Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   try {
    +                                           // run the fetcher' main work 
method
    +                                           kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Hmm, there actually isn't any good reason that this is required, as I can 
think of.
    
    one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher 
actually had always been emitting elements from different threads. So I didn't 
really assume which thread (main or separate) runs the fetcher loop and which 
one runs the discovery loop.
    
    but I think it's also ok to swap this here


> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
>                 Key: FLINK-4022
>                 URL: https://issues.apache.org/jira/browse/FLINK-4022
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to