Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377376 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in active state. Reusing the older state " + + "for the time being"); + break; + } + } catch (ResourceNotFoundException reNotFound) { + throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound); + } catch (InvalidArgumentException inArg) { + throw new RuntimeException("Invalid Arguments to listShards.", inArg); + } catch (ExpiredNextTokenException expiredToken) { + LOG.warn("List Shards has an expired token. Reusing the previous state."); + break; } } - - // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive - // start shard id in the returned shards list; check if we need to remove these erroneously returned shards - if (startShardId != null) { - List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before --- End diff -- Done.
---