Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189432920 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** + * If the internal queue of the {@link KinesisProducer} gets too long, + * flush some of the records until we are below the limit again. + * We don't want to flush _all_ records at this point since that would + * break record aggregation. + */ + private void checkQueueLimit() { + while (producer.getOutstandingRecordsCount() >= queueLimit) { --- End diff -- You mean, log a message if we have checked more than 10 times (5 seconds) for one record? That makes sense. But we shouldn't log every time we reach the threshold, that would lead to log spam.
---