Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197070282 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java --- @@ -267,6 +268,79 @@ public void go() throws Exception { testHarness.close(); } + /** + * Test ensuring that the producer blocks if the queue limit is exceeded, + * until the queue length drops below the limit; + * we set a timeout because the test will not finish if the logic is broken. + */ + @Test(timeout = 10000) + public void testBackpressure() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + producer.setQueueLimit(1); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + UserRecordResult result = mock(UserRecordResult.class); + when(result.isSuccessful()).thenReturn(true); + + CheckedThread msg1 = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.processElement(new StreamRecord<>("msg-1")); + } + }; + msg1.start(); + msg1.trySync(100); + assertFalse("Flush triggered before reaching queue limit", msg1.isAlive()); --- End diff -- I wonder if this would introduce flakiness in the test. @fmthoma could you elaborate a bit here?
---