Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r197137205 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java --- @@ -267,6 +268,79 @@ public void go() throws Exception { testHarness.close(); } + /** + * Test ensuring that the producer blocks if the queue limit is exceeded, + * until the queue length drops below the limit; + * we set a timeout because the test will not finish if the logic is broken. + */ + @Test(timeout = 10000) + public void testBackpressure() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + producer.setQueueLimit(1); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + UserRecordResult result = mock(UserRecordResult.class); + when(result.isSuccessful()).thenReturn(true); + + CheckedThread msg1 = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.processElement(new StreamRecord<>("msg-1")); + } + }; + msg1.start(); + msg1.trySync(100); + assertFalse("Flush triggered before reaching queue limit", msg1.isAlive()); --- End diff -- @tzulitai In principle, yes, if the call `testHarness.processElement(â¦)` takes more than 100 milliseconds. However, I believe this is very unlikely even on slow systems, since the operation is mostly (entirely?) CPU bound. If test failures occur nevertheless, it should be no problem to increase the timeout for `msg1` and `msg2`.
---