Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197143254 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** + * If the internal queue of the {@link KinesisProducer} gets too long, + * flush some of the records until we are below the limit again. + * We don't want to flush _all_ records at this point since that would + * break record aggregation. + */ + private void enforceQueueLimit() { + int attempt = 0; + while (producer.getOutstandingRecordsCount() >= queueLimit) { + backpressureCycles.inc(); + if (attempt >= 10) { + LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt); + } + attempt++; + try { + backpressureLatch.await(100); --- End diff -- It does, but if we make it configurable, I'd rather keep the warning threshold at one second, i.e. `if (attempt >= 1000 / waitTime) { ⦠}`.
---