leekeiabstraction commented on code in PR #195:
URL: 
https://github.com/apache/flink-connector-aws/pull/195#discussion_r2031062113


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java:
##########
@@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader(
         this.kinesis = kinesisProxy;
         this.configuration = configuration;
         this.maxRecordsToGet = 
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
+        this.getRecordsIntervalMillis = 
configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis();
+        this.idleSourceGetRecordsIntervalMillis =
+                
configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis();
     }
 
     @Override
-    protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+    protected RecordBatch fetchRecords(KinesisShardSplitState splitState) 
throws IOException {
+        if (skipUntilScheduledGetRecordTime(splitState)) {
+            return null;
+        }
+
         GetRecordsResponse getRecordsResponse =
                 kinesis.getRecords(
                         splitState.getStreamArn(),
                         splitState.getShardId(),
                         splitState.getNextStartingPosition(),
                         this.maxRecordsToGet);
+
+        scheduleNextGetRecord(splitState, getRecordsResponse);
+
         boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
         return new RecordBatch(
                 getRecordsResponse.records(), 
getRecordsResponse.millisBehindLatest(), isCompleted);
     }
 
+    private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState 
splitState)
+            throws IOException {
+        if (scheduledGetRecordTimes.containsKey(splitState)
+                && scheduledGetRecordTimes.get(splitState) > 
System.currentTimeMillis()) {
+            try {
+                Thread.sleep(1);

Review Comment:
   To elaborate on Abhi's point further. This is to prevent 
KinesisShardSplitReaderBase from immediately looping through fetch() for each 
split in the case where we have idle sources, causing very high CPU usage.
   
   Adding a 1ms pause ensures lower CPU usage at the cost of at most 1ms wait 
time per split. There is an edge case where a source with large number of idle 
assigned split will not loop through all its assigned split in a timely manner 
adding latency to GetRecords from a minority of non-idle shards, but this would 
require upwards of 1000 assigned splits to incur a 1 second delay. Assigning 
upwards of 1000 active splits to a source subtask is an unlikely and 
impractical scenario (app should really have much higher parallelism).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to