[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348376#comment-15348376
 ] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68408986
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -17,156 +17,481 @@
     
     package org.apache.flink.streaming.connectors.kinesis.internals;
     
    +import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    -import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
     import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
     import org.apache.flink.util.InstantiationUtil;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.Map;
    -import java.util.HashMap;
    +
    +import java.util.LinkedList;
     import java.util.List;
    -import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Map;
     import java.util.Properties;
    +
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
    - * The fetcher spawns a single thread for connection to each shard.
    + * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
    + * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
    + * the following:
    + * <ul>
    + *     <li>1. continously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
    + *                   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
    + *                   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
    + *                   to the same subset of shards even after 
restoring)</li>
    + *     <li>2. decide where in each discovered shard should the fetcher 
start subscribing to</li>
    + *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
    + * </ul>
    + *
    + * <p>The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
    + * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
    + * provided in this class.
      */
    -public class KinesisDataFetcher {
    +public class KinesisDataFetcher<T> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
     
    -   /** Config properties for the Flink Kinesis Consumer */
    +   // 
------------------------------------------------------------------------
    +   //  Consumer-wide settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Configuration properties for the Flink Kinesis Consumer */
        private final Properties configProps;
     
    -   /** The name of the consumer task that this fetcher was instantiated */
    -   private final String taskName;
    +   /** The list of Kinesis streams that the consumer is subscribing to */
    +   private final List<String> streams;
    +
    +   /**
    +    * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
    +    * Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
    +    * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
    +    */
    +   private final KinesisDeserializationSchema<T> deserializationSchema;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Subtask-specific settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Runtime context of the subtask that this fetcher was created in */
    +   private final RuntimeContext runtimeContext;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Executor services to run created threads
    +   // 
------------------------------------------------------------------------
    +
    +   /** Executor service to run the {@link ShardDiscoverer} and {@link 
ShardSubscriber} */
    +   private final ExecutorService shardDiscovererAndSubscriberExecutor;
    +
    +   /** Executor service to run {@link ShardConsumer}s to consumer Kinesis 
shards */
    +   private final ExecutorService shardConsumersExecutor;
     
    -   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
    -   private HashMap<KinesisStreamShard, String> 
assignedShardsWithStartingSequenceNum;
    +   // 
------------------------------------------------------------------------
    +   //  Managed state, accessed and updated across multiple threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Reference to the thread that executed run() */
    -   private volatile Thread mainThread;
    +   /**
    +    * Blocking queue for newly discovered shards, with their states, that 
this fetcher should consume.
    +    * The {@link ShardDiscoverer} will add shards with initial position as 
state to this queue as shards are discovered,
    +    * while the {@link ShardSubscriber} polls this queue to start 
subscribing to the new discovered shards.
    +    */
    +   private final BlockingQueue<KinesisStreamShardState> pendingShards;
    +
    +   /**
    +    * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The shard
    +    * subscriber will add to this list as it polls pending shards. Shard 
consumer threads update the last processed
    +    * sequence number of subscribed shards as they fetch and process 
records.
    +    *
    +    * <p>Note that since multiple threads will be performing operations on 
this list, all operations must be wrapped in
    +    * synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} 
lock. For this purpose, all threads must use
    +    * the following thread-safe methods this class provides to operate on 
this list:
    +    * <ul>
    +    *     <li>{@link 
KinesisDataFetcher#addAndStartConsumingNewSubscribedShard(KinesisStreamShardState)}</li>
    +    *     <li>{@link KinesisDataFetcher#updateState(int, String)}</li>
    +    *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(Object, 
int, String)}</li>
    +    * </ul>
    +    */
    +   private final List<KinesisStreamShardState> subscribedShardsState;
    +
    +   private final SourceFunction.SourceContext<T> sourceContext;
    +
    +   /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
    +   private final Object checkpointLock;
     
    -   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
    +   /** This flag is set to true if the fetcher is provided a non-null and 
non-empty restored state */
    +   private final boolean isRestoredFromCheckpoint;
    +
    +   /** Reference to the first error thrown by any of the created threads */
        private final AtomicReference<Throwable> error;
     
        private volatile boolean running = true;
     
        /**
    -    * Creates a new Kinesis Data Fetcher for the specified set of shards
    +    * Creates a Kinesis Data Fetcher.
         *
    -    * @param assignedShards the shards that this fetcher will pull data 
from
    -    * @param configProps the configuration properties of this Flink 
Kinesis Consumer
    -    * @param taskName the task name of this consumer task
    +    * @param streams the streams to subscribe to
    +    * @param sourceContext context of the source function
    +    * @param runtimeContext this subtask's runtime context
    +    * @param configProps the consumer configuration properties
    +    * @param restoredState state of subcribed shards that the fetcher 
should restore to
    +    * @param deserializationSchema deserialization schema
         */
    -   public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, 
Properties configProps, String taskName) {
    +   public KinesisDataFetcher(List<String> streams,
    +                                                   
SourceFunction.SourceContext<T> sourceContext,
    +                                                   RuntimeContext 
runtimeContext,
    +                                                   Properties configProps,
    +                                                   Map<KinesisStreamShard, 
String> restoredState,
    +                                                   
KinesisDeserializationSchema<T> deserializationSchema) {
    +           this(streams,
    +                   sourceContext,
    +                   sourceContext.getCheckpointLock(),
    +                   runtimeContext,configProps,
    +                   restoredState,
    +                   deserializationSchema,
    +                   new AtomicReference<Throwable>(),
    +                   new LinkedBlockingQueue<KinesisStreamShardState>(),
    +                   new LinkedList<KinesisStreamShardState>());
    +   }
    +
    +   /** This constructor is exposed for testing purposes */
    +   protected KinesisDataFetcher(List<String> streams,
    +                                                           
SourceFunction.SourceContext<T> sourceContext,
    +                                                           Object 
checkpointLock,
    +                                                           RuntimeContext 
runtimeContext,
    +                                                           Properties 
configProps,
    +                                                           
Map<KinesisStreamShard, String> restoredState,
    +                                                           
KinesisDeserializationSchema<T> deserializationSchema,
    +                                                           
AtomicReference<Throwable> error,
    +                                                           
LinkedBlockingQueue<KinesisStreamShardState> pendingShardsQueue,
    +                                                           
LinkedList<KinesisStreamShardState> subscribedShardsState) {
    +           this.streams = checkNotNull(streams);
                this.configProps = checkNotNull(configProps);
    -           this.assignedShardsWithStartingSequenceNum = new HashMap<>();
    -           for (KinesisStreamShard shard : assignedShards) {
    -                   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
    +           this.sourceContext = checkNotNull(sourceContext);
    +           this.checkpointLock = checkNotNull(checkpointLock);
    +           this.runtimeContext = checkNotNull(runtimeContext);
    +           this.deserializationSchema = 
checkNotNull(deserializationSchema);
    +
    +           this.error = error;
    +           this.pendingShards = pendingShardsQueue;
    +           this.subscribedShardsState = subscribedShardsState;
    +
    +           this.shardDiscovererAndSubscriberExecutor =
    +                   
createShardDiscovererAndSubscriberThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +           this.shardConsumersExecutor =
    +                   
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +
    +           this.isRestoredFromCheckpoint = (restoredState != null && 
restoredState.entrySet().size() != 0);
    +
    +           // if there is state to restore from last checkpoint, we seed 
them as initially discovered shards
    +           if (isRestoredFromCheckpoint) {
    +                   seedPendingShardsWithRestoredState(restoredState, 
this.pendingShards);
                }
    -           this.taskName = taskName;
    -           this.error = new AtomicReference<>();
        }
     
        /**
    -    * Advance a shard's starting sequence number to a specified value
    +    * Starts the fetcher. After starting the fetcher, it can only
    +    * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
         *
    -    * @param streamShard the shard to perform the advance on
    -    * @param sequenceNum the sequence number to advance to
    +    * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
         */
    -   public void advanceSequenceNumberTo(KinesisStreamShard streamShard, 
String sequenceNum) {
    -           if 
(!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
    -                   throw new IllegalArgumentException("Can't advance 
sequence number on a shard we are not going to read.");
    +   public void runFetcher() throws Exception {
    +
    +           if (LOG.isInfoEnabled()) {
    +                   LOG.info("Subtask {} is starting the shard discoverer 
...", runtimeContext.getIndexOfThisSubtask());
    +           }
    +           shardDiscovererAndSubscriberExecutor.submit(new 
ShardDiscoverer<>(this));
    +
    +           // after this point we will start fetching data from Kinesis 
and update internal state,
    +           // so we check that we are running for the last time before 
continuing
    +           if (!running) {
    +                   return;
    +           }
    +
    +           if (LOG.isInfoEnabled()) {
    +                   LOG.info("Subtask {} is starting the shard subscriber 
...", runtimeContext.getIndexOfThisSubtask());
    +           }
    +           shardDiscovererAndSubscriberExecutor.submit(new 
ShardSubscriber<>(this));
    +
    +           while (running) {
    +                   // once either shutdownFetcher() or stopWithError()
    +                   // is called, we will escape this loop
    +           }
    --- End diff --
    
    Yup this is really bad ... will fix!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to