Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788338 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<StreamShardHandle> shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- if within `listShards(...)` we caught the `ExpiredNextTokenException`, then `null` will be returned as the result, correct? If so, then the current built up `shardsIfStream` will be returned immediately, regardless of whether or not there are more shards following. Although it might not be too common that we have expired tokens here, I wonder if we can handle this case more gracefully (e.g., re-fetching a token to make sure that there really is no more shards).
---