[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348383#comment-15348383 ]
ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r68409331 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -159,42 +217,65 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return kinesisClient.getShardIterator(shard.getStreamName(), shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator(); } + private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException { + List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); + + DescribeStreamResult describeStreamResult; + do { + describeStreamResult = describeStream(streamName, lastSeenShardId); + + List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + for (Shard shard : shards) { + shardsOfStream.add(new KinesisStreamShard(streamName, shard)); + } + + if (shards.size() != 0) { + lastSeenShardId = shards.get(shards.size() - 1).getShardId(); + } + } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + + return shardsOfStream; + } + /** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * + * This method is using a "full jitter" approach described in + * <a href="http://google.com">https://www.awsarchitectureblog.com/2015/03/backoff.html</a>. This is necessary + * because concurrent calls will be made by all parallel subtask's {@link ShardDiscoverer}s. This jitter backoff + * approach will help distribute calls across the discoverers over time. + * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) { + private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; String streamStatus = null; - int remainingRetryTimes = Integer.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES))); - long describeStreamBackoffTimeInMillis = Long.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF))); - // Call DescribeStream, with backoff and retries (if we get LimitExceededException). - while ((remainingRetryTimes >= 0) && (describeStreamResult == null)) { + // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + Random seed = null; + int attemptCount = 0; + while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); } catch (LimitExceededException le) { - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + describeStreamBackoffTimeInMillis + " millis."); - try { - Thread.sleep(describeStreamBackoffTimeInMillis); - } catch (InterruptedException ie) { - LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie); + if (seed == null) { + seed = new Random(); } + long backoffMillis = fullJitterBackoff( + describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++, seed); + LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } - remainingRetryTimes--; } if (streamStatus == null) { --- End diff -- Actually, since we'll be retrying until we get a describeStreamResult now, this message and RuntimeException can be removed. > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)