Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432726
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
                }
        }
     
    +   /**
    +    * If the internal queue of the {@link KinesisProducer} gets too long,
    +    * flush some of the records until we are below the limit again.
    +    * We don't want to flush _all_ records at this point since that would
    +    * break record aggregation.
    +    */
    +   private void checkQueueLimit() {
    +           while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +                   producer.flush();
    --- End diff --
    
    > Do we have to do a flush here? Shouldn't the KPL child process process 
the user records in the background without an explicit flush call?
    
    I don't know for sure, but here's what I think: The KPL aggregates records 
into batches of 1MB before sending them out, in order to achieve maximum 
throughput. If we reach the queue limit before 1MB batch is full, the KPL may 
wait for some time before sending the aggregated record anyway. The `flush()` 
should trigger that immediately.
    
    Also, according to the Javadocs on `flush()`:
    
    ```
        /**
         * Instruct the child process to perform a flush, sending some of the
         * records it has buffered. Applies to all streams.
         * 
         * <p>
         * This does not guarantee that all buffered records will be sent, only 
that
         * most of them will; to flush all records and wait for completion, use
         * {@link #flushSync}.
         * 
         * <p>
         * This method returns immediately without blocking.
    ```
    
    So I think that `flush()` is still the right thing to do, although it might 
make sense to reduce the wait time. `notify()`ing a lock in the callback 
instead of waiting a fixed time might make more sense nevertheless, I will look 
into that.


---

Reply via email to