Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377415 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<StreamShardHandle> shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- Thanks for catching this. This is something which I have fixed by clearing shardsOfStream to ensure we return an empty shardsOfStream in case of ExpiredTokenException. I intended the following behavior for this: In case there is an unlikely case of expired next token, then we will just return an empty ShardsOfStream. This should be alright since in case there are no new shards discovered, by default it ends up returning an empty shardsOfStream.
---