[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519142#comment-16519142
 ] 

ASF GitHub Bot commented on FLINK-9374:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197070282
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
                testHarness.close();
        }
     
    +   /**
    +    * Test ensuring that the producer blocks if the queue limit is 
exceeded,
    +    * until the queue length drops below the limit;
    +    * we set a timeout because the test will not finish if the logic is 
broken.
    +    */
    +   @Test(timeout = 10000)
    +   public void testBackpressure() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +           producer.setQueueLimit(1);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           UserRecordResult result = mock(UserRecordResult.class);
    +           when(result.isSuccessful()).thenReturn(true);
    +
    +           CheckedThread msg1 = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           testHarness.processElement(new 
StreamRecord<>("msg-1"));
    +                   }
    +           };
    +           msg1.start();
    +           msg1.trySync(100);
    +           assertFalse("Flush triggered before reaching queue limit", 
msg1.isAlive());
    --- End diff --
    
    I wonder if this would introduce flakiness in the test.
    @fmthoma could you elaborate a bit here?


> Flink Kinesis Producer does not backpressure
> --------------------------------------------
>
>                 Key: FLINK-9374
>                 URL: https://issues.apache.org/jira/browse/FLINK-9374
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>            Reporter: Franz Thoma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 100000 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to