cmick commented on a change in pull request #16023:
URL: https://github.com/apache/flink/pull/16023#discussion_r645639446



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -419,6 +421,40 @@ protected Connection setupConnection() throws Exception {
         Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
     }
 
+    private static class CallsRealMethodsWithDelay extends CallsRealMethods {
+
+        private final long delay;
+
+        public CallsRealMethodsWithDelay(long delay) {
+            this.delay = delay;
+        }
+
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            Thread.sleep(delay);
+            return super.answer(invocation);
+        }
+    }
+
+    @Test(timeout = 10000L)
+    public void testDeliveryTimeout() throws Exception {
+        source.autoAck = false;
+        // mock delivery delay
+        Mockito.when(source.consumer.nextDelivery())
+                .then(new CallsRealMethodsWithDelay(15000L))
+                .thenThrow(new RuntimeException());
+        Mockito.when(source.consumer.nextDelivery(any(Long.class)))
+                .then(new CallsRealMethodsWithDelay(10L))
+                .thenReturn(null);
+        sourceThread.start();
+        // wait a bit for the source to start
+        Thread.sleep(5);

Review comment:
       You're right. This would require extending the test into:
   1) generating and consuming at least 1 message (we need to make sure that 
the consumer was started), instead of waiting arbitrary 5ms 
   2) stopping the message delivery (similarly as its currently implemented, 
but after the source was started)
   3) cancelling the source and waiting for it to stop (as its currently 
implemented)
   4) checking if the job stopped gracefully (this can be verified by adding 
some helper boolean flag) instead of checking the exception (so this would 
eliminate the second Thread.sleep).
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to