[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481609#comment-16481609 ]
ASF GitHub Bot commented on FLINK-9374: --------------------------------------- Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189432726 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** + * If the internal queue of the {@link KinesisProducer} gets too long, + * flush some of the records until we are below the limit again. + * We don't want to flush _all_ records at this point since that would + * break record aggregation. + */ + private void checkQueueLimit() { + while (producer.getOutstandingRecordsCount() >= queueLimit) { + producer.flush(); --- End diff -- > Do we have to do a flush here? Shouldn't the KPL child process process the user records in the background without an explicit flush call? I don't know for sure, but here's what I think: The KPL aggregates records into batches of 1MB before sending them out, in order to achieve maximum throughput. If we reach the queue limit before 1MB batch is full, the KPL may wait for some time before sending the aggregated record anyway. The `flush()` should trigger that immediately. Also, according to the Javadocs on `flush()`: ``` /** * Instruct the child process to perform a flush, sending some of the * records it has buffered. Applies to all streams. * * <p> * This does not guarantee that all buffered records will be sent, only that * most of them will; to flush all records and wait for completion, use * {@link #flushSync}. * * <p> * This method returns immediately without blocking. ``` So I think that `flush()` is still the right thing to do, although it might make sense to reduce the wait time. `notify()`ing a lock in the callback instead of waiting a fixed time might make more sense nevertheless, I will look into that. > Flink Kinesis Producer does not backpressure > -------------------------------------------- > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Reporter: Franz Thoma > Priority: Critical > Attachments: after.png, before.png > > > The {{FlinkKinesisProducer}} just accepts records and forwards it to a > {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL > internally holds an unbounded queue of records that have not yet been sent. > Since Kinesis is rate-limited to 1MB per second per shard, this queue may > grow indefinitely if Flink sends records faster than the KPL can forward them > to Kinesis. > One way to circumvent this problem is to set a record TTL, so that queued > records are dropped after a certain amount of time, but this will lead to > data loss under high loads. > Currently the only time the queue is flushed is during checkpointing: > {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a > checkpoint is reached (and will wait until the queue is flushed), or until > out-of-memory, whichever is reached first. (This gets worse due to the fact > that the Java KPL is only a thin wrapper around a C++ process, so it is not > even the Java process that runs out of memory, but the C++ process.) The > implicit rate-limit due to checkpointing leads to a ragged throughput graph > like this (the periods with zero throughput are the wait times before a > checkpoint): > !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput > limited by checkpointing only > My proposed solution is to add a config option {{queueLimit}} to set a > maximum number of records that may be waiting in the KPL queue. If this limit > is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and > wait (blocking) until the queue length is below the limit again. This > automatically leads to backpressuring, since the {{FlinkKinesisProducer}} > cannot accept records while waiting. For compatibility, {{queueLimit}} is set > to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a > client explicitly sets the value. Setting a »sane« default value is not > possible unfortunately, since sensible values for the limit depend on the > record size (the limit should be chosen so that about 10–100MB of records per > shard are accumulated before flushing, otherwise the maximum Kinesis > throughput may not be reached). > !after.png! Throughput with a queue limit of 100000 records (the spikes are > checkpoints, where the queue is still flushed completely) -- This message was sent by Atlassian JIRA (v7.6.3#76005)