[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348376#comment-15348376 ]
ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r68408986 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,156 +17,481 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher runs several threads to accomplish + * the following: + * <ul> + * <li>1. continously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring)</li> + * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li> + * <li>3. subscribe to shards by creating a single thread for each shard</li> + * </ul> + * + * <p>The fetcher manages two states: 1) pending shards for subscription, and 2) last processed sequence numbers of + * each subscribed shard. All operations on the states in multiple threads should only be done using the handler methods + * provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher<T> { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // ------------------------------------------------------------------------ + // Consumer-wide settings + // ------------------------------------------------------------------------ + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List<String> streams; + + /** + * The deserialization schema we will be using to convert Kinesis records to Flink objects. + * Note that since this might not be thread-safe, multiple threads in the fetcher using this must + * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. + */ + private final KinesisDeserializationSchema<T> deserializationSchema; + + // ------------------------------------------------------------------------ + // Subtask-specific settings + // ------------------------------------------------------------------------ + + /** Runtime context of the subtask that this fetcher was created in */ + private final RuntimeContext runtimeContext; + + // ------------------------------------------------------------------------ + // Executor services to run created threads + // ------------------------------------------------------------------------ + + /** Executor service to run the {@link ShardDiscoverer} and {@link ShardSubscriber} */ + private final ExecutorService shardDiscovererAndSubscriberExecutor; + + /** Executor service to run {@link ShardConsumer}s to consumer Kinesis shards */ + private final ExecutorService shardConsumersExecutor; - /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ - private HashMap<KinesisStreamShard, String> assignedShardsWithStartingSequenceNum; + // ------------------------------------------------------------------------ + // Managed state, accessed and updated across multiple threads + // ------------------------------------------------------------------------ - /** Reference to the thread that executed run() */ - private volatile Thread mainThread; + /** + * Blocking queue for newly discovered shards, with their states, that this fetcher should consume. + * The {@link ShardDiscoverer} will add shards with initial position as state to this queue as shards are discovered, + * while the {@link ShardSubscriber} polls this queue to start subscribing to the new discovered shards. + */ + private final BlockingQueue<KinesisStreamShardState> pendingShards; + + /** + * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The shard + * subscriber will add to this list as it polls pending shards. Shard consumer threads update the last processed + * sequence number of subscribed shards as they fetch and process records. + * + * <p>Note that since multiple threads will be performing operations on this list, all operations must be wrapped in + * synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose, all threads must use + * the following thread-safe methods this class provides to operate on this list: + * <ul> + * <li>{@link KinesisDataFetcher#addAndStartConsumingNewSubscribedShard(KinesisStreamShardState)}</li> + * <li>{@link KinesisDataFetcher#updateState(int, String)}</li> + * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(Object, int, String)}</li> + * </ul> + */ + private final List<KinesisStreamShardState> subscribedShardsState; + + private final SourceFunction.SourceContext<T> sourceContext; + + /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */ + private final Object checkpointLock; - /** Reference to the first error thrown by any of the spawned shard connection threads */ + /** This flag is set to true if the fetcher is provided a non-null and non-empty restored state */ + private final boolean isRestoredFromCheckpoint; + + /** Reference to the first error thrown by any of the created threads */ private final AtomicReference<Throwable> error; private volatile boolean running = true; /** - * Creates a new Kinesis Data Fetcher for the specified set of shards + * Creates a Kinesis Data Fetcher. * - * @param assignedShards the shards that this fetcher will pull data from - * @param configProps the configuration properties of this Flink Kinesis Consumer - * @param taskName the task name of this consumer task + * @param streams the streams to subscribe to + * @param sourceContext context of the source function + * @param runtimeContext this subtask's runtime context + * @param configProps the consumer configuration properties + * @param restoredState state of subcribed shards that the fetcher should restore to + * @param deserializationSchema deserialization schema */ - public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, Properties configProps, String taskName) { + public KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + Map<KinesisStreamShard, String> restoredState, + KinesisDeserializationSchema<T> deserializationSchema) { + this(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext,configProps, + restoredState, + deserializationSchema, + new AtomicReference<Throwable>(), + new LinkedBlockingQueue<KinesisStreamShardState>(), + new LinkedList<KinesisStreamShardState>()); + } + + /** This constructor is exposed for testing purposes */ + protected KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + Object checkpointLock, + RuntimeContext runtimeContext, + Properties configProps, + Map<KinesisStreamShard, String> restoredState, + KinesisDeserializationSchema<T> deserializationSchema, + AtomicReference<Throwable> error, + LinkedBlockingQueue<KinesisStreamShardState> pendingShardsQueue, + LinkedList<KinesisStreamShardState> subscribedShardsState) { + this.streams = checkNotNull(streams); this.configProps = checkNotNull(configProps); - this.assignedShardsWithStartingSequenceNum = new HashMap<>(); - for (KinesisStreamShard shard : assignedShards) { - assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = checkNotNull(checkpointLock); + this.runtimeContext = checkNotNull(runtimeContext); + this.deserializationSchema = checkNotNull(deserializationSchema); + + this.error = error; + this.pendingShards = pendingShardsQueue; + this.subscribedShardsState = subscribedShardsState; + + this.shardDiscovererAndSubscriberExecutor = + createShardDiscovererAndSubscriberThreadPool(runtimeContext.getTaskNameWithSubtasks()); + this.shardConsumersExecutor = + createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); + + this.isRestoredFromCheckpoint = (restoredState != null && restoredState.entrySet().size() != 0); + + // if there is state to restore from last checkpoint, we seed them as initially discovered shards + if (isRestoredFromCheckpoint) { + seedPendingShardsWithRestoredState(restoredState, this.pendingShards); } - this.taskName = taskName; - this.error = new AtomicReference<>(); } /** - * Advance a shard's starting sequence number to a specified value + * Starts the fetcher. After starting the fetcher, it can only + * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. * - * @param streamShard the shard to perform the advance on - * @param sequenceNum the sequence number to advance to + * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher. */ - public void advanceSequenceNumberTo(KinesisStreamShard streamShard, String sequenceNum) { - if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) { - throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read."); + public void runFetcher() throws Exception { + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} is starting the shard discoverer ...", runtimeContext.getIndexOfThisSubtask()); + } + shardDiscovererAndSubscriberExecutor.submit(new ShardDiscoverer<>(this)); + + // after this point we will start fetching data from Kinesis and update internal state, + // so we check that we are running for the last time before continuing + if (!running) { + return; + } + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} is starting the shard subscriber ...", runtimeContext.getIndexOfThisSubtask()); + } + shardDiscovererAndSubscriberExecutor.submit(new ShardSubscriber<>(this)); + + while (running) { + // once either shutdownFetcher() or stopWithError() + // is called, we will escape this loop + } --- End diff -- Yup this is really bad ... will fix! > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)