Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189432726 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** + * If the internal queue of the {@link KinesisProducer} gets too long, + * flush some of the records until we are below the limit again. + * We don't want to flush _all_ records at this point since that would + * break record aggregation. + */ + private void checkQueueLimit() { + while (producer.getOutstandingRecordsCount() >= queueLimit) { + producer.flush(); --- End diff -- > Do we have to do a flush here? Shouldn't the KPL child process process the user records in the background without an explicit flush call? I don't know for sure, but here's what I think: The KPL aggregates records into batches of 1MB before sending them out, in order to achieve maximum throughput. If we reach the queue limit before 1MB batch is full, the KPL may wait for some time before sending the aggregated record anyway. The `flush()` should trigger that immediately. Also, according to the Javadocs on `flush()`: ``` /** * Instruct the child process to perform a flush, sending some of the * records it has buffered. Applies to all streams. * * <p> * This does not guarantee that all buffered records will be sent, only that * most of them will; to flush all records and wait for completion, use * {@link #flushSync}. * * <p> * This method returns immediately without blocking. ``` So I think that `flush()` is still the right thing to do, although it might make sense to reduce the wait time. `notify()`ing a lock in the callback instead of waiting a fixed time might make more sense nevertheless, I will look into that.
---