Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197067733 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** + * If the internal queue of the {@link KinesisProducer} gets too long, + * flush some of the records until we are below the limit again. + * We don't want to flush _all_ records at this point since that would + * break record aggregation. + */ + private void enforceQueueLimit() { + int attempt = 0; + while (producer.getOutstandingRecordsCount() >= queueLimit) { + backpressureCycles.inc(); + if (attempt >= 10) { + LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt); + } + attempt++; + try { + backpressureLatch.await(100); --- End diff -- We might want to make the wait time configurable? (as a separate PR) My reasoning is that it directly affects how long until the "flush taking unusually long" message starts popping up.
---