[ https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554901#comment-16554901 ]
ASF GitHub Bot commented on FLINK-9897: --------------------------------------- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204942574 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,68 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** + * Adjusts loop timing to match target frequency if specified. + * @param processingStartTimeNanos The start time of the run loop "work" + * @param processingEndTimeNanos The end time of the run loop "work" + * @return The System.nanoTime() after the sleep (if any) + * @throws InterruptedException + */ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** + * Calculates how many records to read each time through the loop based on a target throughput + * and the measured frequenecy of the loop. + * @param runLoopTimeNanos The total time of one pass through the loop + * @param numRecords The number of records of the last read operation + * @param recordBatchSizeBytes The total batch size of the last read operation + */ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { + if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) { + long averageRecordSizeBytes = recordBatchSizeBytes / numRecords; + // Adjust number of records to fetch from the shard depending on current average record size + // to optimize 2 Mb / sec read limits + double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos; + double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; + maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); + // Ensure the value is not more than 10000L + maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + } + return maxNumberOfRecordsPerFetch; --- End diff -- Oops, thanks for catching. Updated to use the return value and also to use a local variable in the method to avoid re-assigning the class variable `maxNumberOfRecordsPerFetch` twice. > Further enhance adaptive reads in Kinesis Connector to depend on run loop time > ------------------------------------------------------------------------------ > > Key: FLINK-9897 > URL: https://issues.apache.org/jira/browse/FLINK-9897 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.4.2, 1.5.1 > Reporter: Lakshmi Rao > Assignee: Lakshmi Rao > Priority: Major > Labels: pull-request-available > > In FLINK-9692, we introduced the ability for the shardConsumer to adaptively > read more records based on the current average record size to optimize the 2 > Mb/sec shard limit. The feature maximizes maxNumberOfRecordsPerFetch of 5 > reads/sec (as prescribed by Kinesis limits). In the case where applications > take more time to process records in the run loop, they are no longer able to > read at a frequency of 5 reads/sec (even though their fetchIntervalMillis > maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch > should be calculated based on the time that the run loop actually takes as > opposed to fetchIntervalMillis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)