tzulitai commented on a change in pull request #7601: [FLINK-11164] Check for 
sentinel values when creating new Kinesis ShardIterator
URL: https://github.com/apache/flink/pull/7601#discussion_r252300289
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -381,13 +402,15 @@ private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) thr
                        } catch (ExpiredIteratorException eiEx) {
                                LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
                                        " refreshing the iterator ...", 
shardItr, subscribedShard);
-                               if 
(SentinelSequenceNumber.getAllSentinelSequenceNumbers().contains(lastSequenceNum))
 {
+                               if (isSentinelSequenceNumber(lastSequenceNum)) {
 
 Review comment:
   Now this if-else branch is basically identical to the 
`getInitialShardIterator` method, minus the fact that `lastSequenceNum` will 
never be in the middle of an aggregated record (which is fine).
   I don't think the new warning log adds too much value, TBH.
   
   So, maybe rename `getInitialShardIterator` to just `getShardIterator` and 
use it here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to