Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377415
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
        private List<StreamShardHandle> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
                List<StreamShardHandle> shardsOfStream = new ArrayList<>();
     
    -           DescribeStreamResult describeStreamResult;
    +           // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
    +           // we need to use the returned nextToken to get additional 
shards.
    +           ListShardsResult listShardsResult;
    +           String startShardToken = null;
                do {
    -                   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
    -
    -                   List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
    +                   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
    --- End diff --
    
    Thanks for catching this. This is something which I have fixed by clearing 
shardsOfStream to ensure we return an empty shardsOfStream in case of 
ExpiredTokenException.
    I intended the following behavior for this: In case there is an unlikely 
case of expired next token, then we will just return an empty ShardsOfStream. 
This should be alright since in case there are no new shards discovered, by 
default it ends up returning an empty shardsOfStream. 


---

Reply via email to