tzulitai commented on a change in pull request #7601: [FLINK-11164] Check for 
sentinel values when creating new Kinesis ShardIterator
URL: https://github.com/apache/flink/pull/7601#discussion_r252296340
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -149,52 +150,72 @@ protected String getInitialShardIterator() throws 
Exception {
 
                // before infinitely looping, we set the initial nextShardItr 
appropriately
 
-               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
+               if (isSentinelSequenceNumber(lastSequenceNum)) {
+                       nextShardItr = 
getShardIteratorForSentinel(lastSequenceNum);
+               } else {
+                       // we will be starting from an actual sequence number 
(due to restore from failure).
+                       // if the last sequence number refers to an aggregated 
record, we need to clean up any dangling sub-records
+                       // from the last aggregated record; otherwise, we can 
simply start iterating from the record right after.
+
+                       nextShardItr = 
getShardIteratorForRealSequenceNumber(lastSequenceNum);
+               }
+               return nextShardItr;
+       }
+
+       protected String getShardIteratorForSentinel(SequenceNumber 
sentinelSequenceNumber) throws InterruptedException {
+               String nextShardItr;
+
+               if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
                        // if the shard is already closed, there will be no 
latest next record to get for this shard
                        if (subscribedShard.isClosed()) {
                                nextShardItr = null;
                        } else {
                                nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
                        }
-               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
+               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
                        nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
-               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
                        nextShardItr = null;
-               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
                        nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
                } else {
-                       // we will be starting from an actual sequence number 
(due to restore from failure).
-                       // if the last sequence number refers to an aggregated 
record, we need to clean up any dangling sub-records
-                       // from the last aggregated record; otherwise, we can 
simply start iterating from the record right after.
+                       throw new RuntimeException("Unknown sentinel type: " + 
sentinelSequenceNumber);
+               }
 
-                       if (lastSequenceNum.isAggregated()) {
-                               String itrForLastAggregatedRecord =
-                                       
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+               return nextShardItr;
+       }
 
-                               // get only the last aggregated record
-                               GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
+       protected String getShardIteratorForRealSequenceNumber(SequenceNumber 
sequenceNumber) throws Exception {
+               String nextShardItr;
 
-                               List<UserRecord> fetchedRecords = 
deaggregateRecords(
+               if (sequenceNumber.isAggregated()) {
 
 Review comment:
   While you're at refactoring this:
   Can we have the logic in the `sequenceNumber.isAggregated()` branch in a 
separate method?
   Or
   invert the if-else branch.
   
   Just for better code readability.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to