[ https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558698#comment-16558698 ]
ASF GitHub Bot commented on FLINK-9897: --------------------------------------- tweise commented on a change in pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive reads depend on run loop time instead of fetchintervalmillis URL: https://github.com/apache/flink/pull/6408#discussion_r205551265 ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ########## @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** + * Adjusts loop timing to match target frequency if specified. + * @param processingStartTimeNanos The start time of the run loop "work" + * @param processingEndTimeNanos The end time of the run loop "work" + * @return The System.nanoTime() after the sleep (if any) + * @throws InterruptedException + */ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** + * Calculates how many records to read each time through the loop based on a target throughput + * and the measured frequenecy of the loop. + * @param runLoopTimeNanos The total time of one pass through the loop + * @param numRecords The number of records of the last read operation + * @param recordBatchSizeBytes The total batch size of the last read operation + */ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { Review comment: +1 except that it shouldn't be static so that a subclass can override it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Further enhance adaptive reads in Kinesis Connector to depend on run loop time > ------------------------------------------------------------------------------ > > Key: FLINK-9897 > URL: https://issues.apache.org/jira/browse/FLINK-9897 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.4.2, 1.5.1 > Reporter: Lakshmi Rao > Assignee: Lakshmi Rao > Priority: Major > Labels: pull-request-available > > In FLINK-9692, we introduced the ability for the shardConsumer to adaptively > read more records based on the current average record size to optimize the 2 > Mb/sec shard limit. The feature maximizes maxNumberOfRecordsPerFetch of 5 > reads/sec (as prescribed by Kinesis limits). In the case where applications > take more time to process records in the run loop, they are no longer able to > read at a frequency of 5 reads/sec (even though their fetchIntervalMillis > maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch > should be calculated based on the time that the run loop actually takes as > opposed to fetchIntervalMillis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)