Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788338
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
        private List<StreamShardHandle> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
                List<StreamShardHandle> shardsOfStream = new ArrayList<>();
     
    -           DescribeStreamResult describeStreamResult;
    +           // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
    +           // we need to use the returned nextToken to get additional 
shards.
    +           ListShardsResult listShardsResult;
    +           String startShardToken = null;
                do {
    -                   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
    -
    -                   List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
    +                   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
    --- End diff --
    
    if within `listShards(...)` we caught the `ExpiredNextTokenException`, then 
`null` will be returned as the result, correct?
    If so, then the current built up `shardsIfStream` will be returned 
immediately, regardless of whether or not there are more shards following.
    
    Although it might not be too common that we have expired tokens here, I 
wonder if we can handle this case more gracefully (e.g., re-fetching a token to 
make sure that there really is no more shards).


---

Reply via email to