[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996621#comment-15996621
 ] 

ASF GitHub Bot commented on FLINK-4022:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114757153
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
 ---
    @@ -0,0 +1,254 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Base class for all partition discoverers.
    + *
    + * <p>This partition discoverer base class implements the logic around 
bookkeeping
    + * discovered partitions, and using the information to determine whether 
or not there
    + * are new partitions that the consumer subtask should subscribe to.
    + *
    + * <p>Subclass implementations should simply implement the logic of using 
the version-specific
    + * Kafka clients to fetch topic and partition metadata.
    + *
    + * <p>Since Kafka clients are generally not thread-safe, partition 
discoverers should
    + * not be concurrently accessed. The only exception for this would be the 
{@link #wakeup()}
    + * call, which allows the discoverer to be interrupted during a {@link 
#discoverPartitions()} call.
    + */
    +public abstract class AbstractPartitionDiscoverer {
    +
    +   /** Describes whether we are discovering partitions for fixed topics or 
a topic pattern. */
    +   private final KafkaTopicsDescriptor topicsDescriptor;
    +
    +   /** Index of the consumer subtask that this partition discoverer 
belongs to. */
    +   private final int indexOfThisSubtask;
    +
    +   /** The total number of consumer subtasks. */
    +   private final int numParallelSubtasks;
    +
    +   /** Flag to determine whether or not the discoverer is closed. */
    +   private volatile boolean closed = true;
    +
    +   /**
    +    * Flag to determine whether or not the discoverer had been woken up.
    +    * When set to {@code true}, {@link #discoverPartitions()} would be 
interrupted as early as possible.
    +    * Once interrupted, the flag is reset.
    +    */
    +   private volatile boolean wakeup;
    +
    +   /**
    +    * Map of topics to they're largest discovered partition id seen by 
this subtask.
    +    * This state may be updated whenever {@link 
AbstractPartitionDiscoverer#discoverPartitions()} or
    +    * {@link 
AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)}
 is called.
    +    *
    +    * This is used to remove old partitions from the fetched partition 
lists. It is sufficient
    +    * to keep track of only the largest partition id because Kafka 
partition numbers are only
    +    * allowed to be increased and has incremental ids.
    +    */
    +   private final Map<String, Integer> topicsToLargestDiscoveredPartitionId;
    +
    +   public AbstractPartitionDiscoverer(
    +                   KafkaTopicsDescriptor topicsDescriptor,
    +                   int indexOfThisSubtask,
    +                   int numParallelSubtasks) {
    +
    +           this.topicsDescriptor = checkNotNull(topicsDescriptor);
    +           this.indexOfThisSubtask = indexOfThisSubtask;
    +           this.numParallelSubtasks = numParallelSubtasks;
    +           this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
    +   }
    +
    +   /**
    +    * Opens the partition discoverer, initializing all required Kafka 
connections.
    +    *
    +    * <p>NOTE: thread-safety is not guaranteed.
    +    */
    +   public void open() throws Exception {
    +           closed = false;
    +           initializeConnections();
    +   }
    +
    +   /**
    +    * Closes the partition discoverer, cleaning up all Kafka connections.
    +    *
    +    * <p>NOTE: thread-safety is not guaranteed.
    +    */
    +   public void close() throws Exception {
    +           closed = true;
    +           closeConnections();
    +   }
    +
    +   /**
    +    * Interrupt an in-progress discovery attempt by throwing a {@link 
WakeupException}.
    +    * If no attempt is in progress, the immediate next attempt will throw 
a {@link WakeupException}.
    +    *
    +    * <p>This method can be called concurrently from a different thread.
    +    */
    +   public void wakeup() {
    +           wakeup = true;
    +           wakeupConnections();
    +   }
    +
    +   /**
    +    * Execute a partition discovery attempt for this subtask.
    +    * This method lets the partition discoverer update what partitions it 
has discovered so far.
    +    *
    +    * @return List of discovered new partitions that this subtask should 
subscribe to.
    +    */
    +   public List<KafkaTopicPartition> discoverPartitions() throws 
WakeupException, ClosedException {
    +           if (!closed && !wakeup) {
    +                   try {
    +                           List<KafkaTopicPartition> 
newDiscoveredPartitions;
    +
    +                           // (1) get all possible partitions, based on 
whether we are subscribed to fixed topics or a topic patern
    --- End diff --
    
    Typo "patern"


> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
>                 Key: FLINK-4022
>                 URL: https://issues.apache.org/jira/browse/FLINK-4022
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to