tzulitai commented on a change in pull request #7601: [FLINK-11164] Check for sentinel values when creating new Kinesis ShardIterator URL: https://github.com/apache/flink/pull/7601#discussion_r252296340
########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ########## @@ -149,52 +150,72 @@ protected String getInitialShardIterator() throws Exception { // before infinitely looping, we set the initial nextShardItr appropriately - if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { + if (isSentinelSequenceNumber(lastSequenceNum)) { + nextShardItr = getShardIteratorForSentinel(lastSequenceNum); + } else { + // we will be starting from an actual sequence number (due to restore from failure). + // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records + // from the last aggregated record; otherwise, we can simply start iterating from the record right after. + + nextShardItr = getShardIteratorForRealSequenceNumber(lastSequenceNum); + } + return nextShardItr; + } + + protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException { + String nextShardItr; + + if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { // if the shard is already closed, there will be no latest next record to get for this shard if (subscribedShard.isClosed()) { nextShardItr = null; } else { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); } - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { + } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { nextShardItr = null; - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { + } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp); } else { - // we will be starting from an actual sequence number (due to restore from failure). - // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records - // from the last aggregated record; otherwise, we can simply start iterating from the record right after. + throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber); + } - if (lastSequenceNum.isAggregated()) { - String itrForLastAggregatedRecord = - kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + return nextShardItr; + } - // get only the last aggregated record - GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); + protected String getShardIteratorForRealSequenceNumber(SequenceNumber sequenceNumber) throws Exception { + String nextShardItr; - List<UserRecord> fetchedRecords = deaggregateRecords( + if (sequenceNumber.isAggregated()) { Review comment: While you're at refactoring this: Can we have the logic in the `sequenceNumber.isAggregated()` branch in a separate method? Or invert the if-else branch. Just for better code readability. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services