leekeiabstraction commented on code in PR #195: URL: https://github.com/apache/flink-connector-aws/pull/195#discussion_r2031062113
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java: ########## @@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader( this.kinesis = kinesisProxy; this.configuration = configuration; this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX); + this.getRecordsIntervalMillis = configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis(); + this.idleSourceGetRecordsIntervalMillis = + configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis(); } @Override - protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { + protected RecordBatch fetchRecords(KinesisShardSplitState splitState) throws IOException { + if (skipUntilScheduledGetRecordTime(splitState)) { + return null; + } + GetRecordsResponse getRecordsResponse = kinesis.getRecords( splitState.getStreamArn(), splitState.getShardId(), splitState.getNextStartingPosition(), this.maxRecordsToGet); + + scheduleNextGetRecord(splitState, getRecordsResponse); + boolean isCompleted = getRecordsResponse.nextShardIterator() == null; return new RecordBatch( getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); } + private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState splitState) + throws IOException { + if (scheduledGetRecordTimes.containsKey(splitState) + && scheduledGetRecordTimes.get(splitState) > System.currentTimeMillis()) { + try { + Thread.sleep(1); Review Comment: To elaborate on Abhi's point further. This is to prevent KinesisShardSplitReaderBase from immediately looping through fetch() for each split in the case where we have idle sources, causing very high CPU usage. Adding a 1ms pause ensures lower CPU usage at the cost of at most 1ms wait time per split. There is an edge case where a source with large number of idle assigned split will not loop through all its assigned split in a timely manner adding latency to GetRecords from a minority of non-idle shards, but this would require upwards of 1000 assigned splits to incur a 1 second delay. Assigning upwards of 1000 active splits to a source subtask is an unlikely and impractical scenario (app should really have much higher parallelism). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org