cmick commented on a change in pull request #16023: URL: https://github.com/apache/flink/pull/16023#discussion_r645639446
########## File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ########## @@ -419,6 +421,40 @@ protected Connection setupConnection() throws Exception { Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt()); } + private static class CallsRealMethodsWithDelay extends CallsRealMethods { + + private final long delay; + + public CallsRealMethodsWithDelay(long delay) { + this.delay = delay; + } + + public Object answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(delay); + return super.answer(invocation); + } + } + + @Test(timeout = 10000L) + public void testDeliveryTimeout() throws Exception { + source.autoAck = false; + // mock delivery delay + Mockito.when(source.consumer.nextDelivery()) + .then(new CallsRealMethodsWithDelay(15000L)) + .thenThrow(new RuntimeException()); + Mockito.when(source.consumer.nextDelivery(any(Long.class))) + .then(new CallsRealMethodsWithDelay(10L)) + .thenReturn(null); + sourceThread.start(); + // wait a bit for the source to start + Thread.sleep(5); Review comment: You're right. This would require extending the test into: 1) generating and consuming at least 1 message (we need to make sure that the consumer was started), instead of waiting arbitrary 5ms 2) stopping the message delivery (similarly as its currently implemented, but after the source was started) 3) cancelling the source and waiting for it to stop (as its currently implemented) 4) checking if the job stopped gracefully (this can be verified by adding some helper boolean flag) instead of checking the exception (so this would eliminate the second Thread.sleep). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org