yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r965493011


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -199,116 +202,182 @@ private RetryWithToleranceOperator setupExecutor() {
 
     @Test
     public void testExecAndHandleRetriableErrorOnce() throws Exception {
-        execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), 
new RetriableException("Test"), true);
     }
 
     @Test
     public void testExecAndHandleRetriableErrorThrice() throws Exception {
-        execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), 
new RetriableException("Test"), true);
     }
 
     @Test
-    public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
-        execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable 
Test"));
+    public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws 
Exception {
+        execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 
2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
     }
 
     @Test
-    public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
-        execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable 
Test"));
+    public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws 
Exception {
+        execAndHandleRetriableError(6000, 6, Arrays.asList(300L, 600L, 1200L, 
2400L, 1500L), new RetriableException("Test"), false);
     }
 
-    public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
+    public void execAndHandleRetriableError(long errorRetryTimeout, int 
numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean 
successExpected) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, 
ALL, time, new ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
-        EasyMock.expect(mockOperation.call()).andReturn("Success");
 
-        replay(mockOperation);
+        if (successExpected) {
+            EasyMock.expect(mockOperation.call()).andReturn("Success");
+        }
+
+        for (Long expectedWait : expectedWaits) {
+            EasyMock.expect(exitLatch.await(expectedWait, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+                time.sleep(expectedWait);
+                return false;
+            });
+        }
+
+        replay(mockOperation, exitLatch);
 
         String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
-        assertFalse(retryWithToleranceOperator.failed());
-        assertEquals("Success", result);
-        assertEquals(expectedWait, time.hiResClockMs());
 
+        if (successExpected) {
+            assertFalse(retryWithToleranceOperator.failed());
+            assertEquals("Success", result);
+        } else {
+            assertTrue(retryWithToleranceOperator.failed());
+        }
+
+        EasyMock.verify(mockOperation);
         PowerMock.verifyAll();
     }
 
-    public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
+    @Test
+    public void testExecAndHandleNonRetriableError() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
-        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
-        EasyMock.expect(mockOperation.call()).andReturn("Success");
+        EasyMock.expect(mockOperation.call()).andThrow(new 
Exception("Test")).times(1);
 
-        replay(mockOperation);
+        // expect no call to exitLatch.await() which is only called during the 
retry backoff
+
+        replay(mockOperation, exitLatch);
 
         String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
         assertTrue(retryWithToleranceOperator.failed());
         assertNull(result);
-        assertEquals(expectedWait, time.hiResClockMs());
 
+        EasyMock.verify(mockOperation);
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testCheckRetryLimit() {
+    public void testExitLatch() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(500, 100, NONE, time);
-
-        time.setCurrentTimeMs(100);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
-
-        time.setCurrentTimeMs(200);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
-
-        time.setCurrentTimeMs(400);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        EasyMock.expect(mockOperation.call()).andThrow(new 
RetriableException("test")).anyTimes();
+        EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(300);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(600, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(600);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(1200, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(1200);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(2400, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(2400);
+            retryWithToleranceOperator.triggerStop();
+            return false;
+        });
 
-        time.setCurrentTimeMs(499);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        // expect no more calls to exitLatch.await() after 
retryWithToleranceOperator.triggerStop() is called
 
-        time.setCurrentTimeMs(501);
-        assertFalse(retryWithToleranceOperator.checkRetry(0));
+        exitLatch.countDown();
+        EasyMock.expectLastCall().once();
 
-        time.setCurrentTimeMs(600);
-        assertFalse(retryWithToleranceOperator.checkRetry(0));
+        replay(mockOperation, exitLatch);
+        retryWithToleranceOperator.execAndHandleError(mockOperation, 
Exception.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        assertEquals(4500L, time.milliseconds());
+        PowerMock.verifyAll();
     }
 
     @Test
-    public void testBackoffLimit() {
+    public void testBackoffLimit() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(5, 5000, NONE, time);
+        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(5, 5000, NONE, time, new ProcessingContext(), 
exitLatch);
 
-        long prevTs = time.hiResClockMs();
-        retryWithToleranceOperator.backoff(1, 5000);
-        assertEquals(300, time.hiResClockMs() - prevTs);
+        EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(300);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(600, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(600);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(1200, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(1200);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(2400, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(2400);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(500, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(500);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(0, 
TimeUnit.MILLISECONDS)).andReturn(false);
 
-        prevTs = time.hiResClockMs();
-        retryWithToleranceOperator.backoff(2, 5000);
-        assertEquals(600, time.hiResClockMs() - prevTs);
+        replay(exitLatch);
 
-        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(1, 5000);
+        retryWithToleranceOperator.backoff(2, 5000);
         retryWithToleranceOperator.backoff(3, 5000);
-        assertEquals(1200, time.hiResClockMs() - prevTs);
-
-        prevTs = time.hiResClockMs();
         retryWithToleranceOperator.backoff(4, 5000);
-        assertEquals(2400, time.hiResClockMs() - prevTs);
-
-        prevTs = time.hiResClockMs();
         retryWithToleranceOperator.backoff(5, 5000);
-        assertEquals(500, time.hiResClockMs() - prevTs);
-
-        prevTs = time.hiResClockMs();
         retryWithToleranceOperator.backoff(6, 5000);
-        assertEquals(0, time.hiResClockMs() - prevTs);
 
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testBackoffMinDurationZero() throws Exception {
+        MockTime time = new MockTime(10, 0, 0);
+        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(335, 5000, NONE, time, new ProcessingContext(), 
exitLatch);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        EasyMock.expect(mockOperation.call()).andThrow(new 
RetriableException("Test")).anyTimes();
+        EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(300);
+            return false;
+        });
+        // Due to the auto tick setup on the MockTime instance, each call to 
time.milliseconds() will
+        // advance the clock by 10ms. The error retry timeout is configured to 
335 ms so that after
+        // the first retry which backs off for 300 ms, just enough time will 
pass between the
+        // deadline check (when the current time in milliseconds will be 330) 
and the second backoff call
+        // so that in the backoff method, the current time (now 345) will have 
exceeded the deadline.
+        // This will cause the backoff delay to be 0ms (min possible delay 
since it shouldn't be negative)
+        EasyMock.expect(exitLatch.await(0, 
TimeUnit.MILLISECONDS)).andReturn(false);
+        replay(mockOperation, exitLatch);
+
+        retryWithToleranceOperator.execAndRetry(mockOperation);
+        assertTrue(retryWithToleranceOperator.failed());
+        PowerMock.verifyAll();
+    }

Review Comment:
   Ah yeah this is better since the new test case is a little brittle in the 
sense that any changes made to the number of times `time.milliseconds()` is 
called in `execAndRetry` / `backoff` could potentially break the test case. 
It's also simpler and easier to follow. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to