[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540706#comment-16540706 ]
ASF GitHub Bot commented on FLINK-9692: --------------------------------------- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r201792650 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -224,10 +232,19 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + long recordBatchSizeBytes = 0L; + long averageRecordSizeBytes = 0L; + for (UserRecord record : fetchedRecords) { + recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } + if (useAdaptiveReads && fetchedRecords.size() != 0) { --- End diff -- nit: && !isEmpty() > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -------------------------------------------------------------------------------------- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.5.0, 1.4.2 > Reporter: Lakshmi Rao > Assignee: Lakshmi Rao > Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)