tzulitai commented on a change in pull request #7601: [FLINK-11164] Check for 
sentinel values when creating new Kinesis ShardIterator
URL: https://github.com/apache/flink/pull/7601#discussion_r252294884
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -149,52 +150,72 @@ protected String getInitialShardIterator() throws 
Exception {
 
                // before infinitely looping, we set the initial nextShardItr 
appropriately
 
-               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
+               if (isSentinelSequenceNumber(lastSequenceNum)) {
+                       nextShardItr = 
getShardIteratorForSentinel(lastSequenceNum);
+               } else {
+                       // we will be starting from an actual sequence number 
(due to restore from failure).
+                       // if the last sequence number refers to an aggregated 
record, we need to clean up any dangling sub-records
 
 Review comment:
   comments here about record aggregation should go into the 
`getShardIteratorForRealSequenceNumber` method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to