Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377376
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
         * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
         * @return the result of the describe stream operation
         */
    -   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
    -           final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
    -           describeStreamRequest.setStreamName(streamName);
    -           describeStreamRequest.setExclusiveStartShardId(startShardId);
    +   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
    +                                                                           
                                                                        
@Nullable String startNextToken)
    +                   throws InterruptedException {
    +           final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
    +           if (startNextToken == null) {
    +                   
listShardsRequest.setExclusiveStartShardId(startShardId);
    +                   listShardsRequest.setStreamName(streamName);
    +           } else {
    +                   // Note the nextToken returned by AWS expires within 
300 sec.
    +                   listShardsRequest.setNextToken(startNextToken);
    +           }
     
    -           DescribeStreamResult describeStreamResult = null;
    +           ListShardsResult listShardsResults = null;
     
    -           // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
    +           // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
                int attemptCount = 0;
    -           while (describeStreamResult == null) { // retry until we get a 
result
    +           // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
    +           // are taken up.
    +           while (listShardsResults == null) { // retry until we get a 
result
                        try {
    -                           describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
    +                           listShardsResults = 
kinesisClient.listShards(listShardsRequest);
                        } catch (LimitExceededException le) {
                                long backoffMillis = fullJitterBackoff(
    -                                   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
    -                           LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
    -                                   + backoffMillis + " millis.");
    +                                           listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
    +                                   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
    +                                                                   + ". 
Backing off for " + backoffMillis + " millis.");
                                Thread.sleep(backoffMillis);
    -                   } catch (ResourceNotFoundException re) {
    -                           throw new RuntimeException("Error while getting 
stream details", re);
    -                   }
    -           }
    -
    -           String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
    -           if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString()))) {
    -                   if (LOG.isWarnEnabled()) {
    -                           LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
    -                                   "describeStream operation will not 
contain any shard information.");
    +                   } catch (ResourceInUseException reInUse) {
    +                           if (LOG.isWarnEnabled()) {
    +                                   // List Shards will throw an exception 
if stream in not in active state. Will return
    +                                   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
    +                                                   + "for the time being");
    +                                   break;
    +                           }
    +                   } catch (ResourceNotFoundException reNotFound) {
    +                           throw new RuntimeException("Stream not found. 
Error while getting shard list.", reNotFound);
    +                   } catch (InvalidArgumentException inArg) {
    +                           throw new RuntimeException("Invalid Arguments 
to listShards.", inArg);
    +                   } catch (ExpiredNextTokenException expiredToken) {
    +                           LOG.warn("List Shards has an expired token. 
Reusing the previous state.");
    +                           break;
                        }
                }
    -
    -           // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before the exclusive
    -           // start shard id in the returned shards list; check if we need 
to remove these erroneously returned shards
    -           if (startShardId != null) {
    -                   List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly 
exclude shards before
    --- End diff --
    
    Done.


---

Reply via email to