[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348281#comment-15348281 ]
ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r68399467 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -122,29 +173,36 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { } /** - * Get the list of shards associated with multiple Kinesis streams + * Get the complete shard list of multiple Kinesis streams. * - * @param streamNames the list of Kinesis streams - * @return a list of {@link KinesisStreamShard}s + * @param streamNames Kinesis streams to retrieve the shard list for + * @return shard list result */ - public List<KinesisStreamShard> getShardList(List<String> streamNames) { - List<KinesisStreamShard> shardList = new ArrayList<>(); + public GetShardListResult getShardList(List<String> streamNames) throws InterruptedException { + GetShardListResult result = new GetShardListResult(); for (String stream : streamNames) { - DescribeStreamResult describeStreamResult; - String lastSeenShardId = null; + result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, null)); + } + return result; + } - do { - describeStreamResult = describeStream(stream, lastSeenShardId); + /** + * Get shard list of multiple Kinesis streams, ignoring the + * shards of each streambefore a specified last seen shard id. + * + * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value + * @return shard list result + */ + public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException { --- End diff -- Missing `@Override` annotation > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)