C0urante commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r964923239
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -199,116 +202,182 @@ private RetryWithToleranceOperator setupExecutor() { @Test public void testExecAndHandleRetriableErrorOnce() throws Exception { - execAndHandleRetriableError(1, 300, new RetriableException("Test")); + execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), new RetriableException("Test"), true); } @Test public void testExecAndHandleRetriableErrorThrice() throws Exception { - execAndHandleRetriableError(3, 2100, new RetriableException("Test")); + execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), new RetriableException("Test"), true); } @Test - public void testExecAndHandleNonRetriableErrorOnce() throws Exception { - execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable Test")); + public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws Exception { + execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true); } @Test - public void testExecAndHandleNonRetriableErrorThrice() throws Exception { - execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable Test")); + public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Exception { + execAndHandleRetriableError(6000, 6, Arrays.asList(300L, 600L, 1200L, 2400L, 1500L), new RetriableException("Test"), false); } - public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { + public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean successExpected) throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); + CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); - EasyMock.expect(mockOperation.call()).andReturn("Success"); - replay(mockOperation); + if (successExpected) { + EasyMock.expect(mockOperation.call()).andReturn("Success"); + } + + for (Long expectedWait : expectedWaits) { + EasyMock.expect(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(expectedWait); + return false; + }); + } + + replay(mockOperation, exitLatch); String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); - assertFalse(retryWithToleranceOperator.failed()); - assertEquals("Success", result); - assertEquals(expectedWait, time.hiResClockMs()); + if (successExpected) { + assertFalse(retryWithToleranceOperator.failed()); + assertEquals("Success", result); + } else { + assertTrue(retryWithToleranceOperator.failed()); + } + + EasyMock.verify(mockOperation); PowerMock.verifyAll(); } - public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { + @Test + public void testExecAndHandleNonRetriableError() throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); + CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); - EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); - EasyMock.expect(mockOperation.call()).andReturn("Success"); + EasyMock.expect(mockOperation.call()).andThrow(new Exception("Test")).times(1); - replay(mockOperation); + // expect no call to exitLatch.await() which is only called during the retry backoff + + replay(mockOperation, exitLatch); String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); assertTrue(retryWithToleranceOperator.failed()); assertNull(result); - assertEquals(expectedWait, time.hiResClockMs()); + EasyMock.verify(mockOperation); PowerMock.verifyAll(); } @Test - public void testCheckRetryLimit() { + public void testExitLatch() throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(500, 100, NONE, time); - - time.setCurrentTimeMs(100); - assertTrue(retryWithToleranceOperator.checkRetry(0)); - - time.setCurrentTimeMs(200); - assertTrue(retryWithToleranceOperator.checkRetry(0)); - - time.setCurrentTimeMs(400); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("test")).anyTimes(); + EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(300); + return false; + }); + EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(600); + return false; + }); + EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(1200); + return false; + }); + EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(2400); + retryWithToleranceOperator.triggerStop(); + return false; + }); - time.setCurrentTimeMs(499); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + // expect no more calls to exitLatch.await() after retryWithToleranceOperator.triggerStop() is called - time.setCurrentTimeMs(501); - assertFalse(retryWithToleranceOperator.checkRetry(0)); + exitLatch.countDown(); + EasyMock.expectLastCall().once(); - time.setCurrentTimeMs(600); - assertFalse(retryWithToleranceOperator.checkRetry(0)); + replay(mockOperation, exitLatch); + retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); + assertTrue(retryWithToleranceOperator.failed()); + assertEquals(4500L, time.milliseconds()); + PowerMock.verifyAll(); } @Test - public void testBackoffLimit() { + public void testBackoffLimit() throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time); + CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time, new ProcessingContext(), exitLatch); - long prevTs = time.hiResClockMs(); - retryWithToleranceOperator.backoff(1, 5000); - assertEquals(300, time.hiResClockMs() - prevTs); + EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(300); + return false; + }); + EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(600); + return false; + }); + EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(1200); + return false; + }); + EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(2400); + return false; + }); + EasyMock.expect(exitLatch.await(500, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(500); + return false; + }); + EasyMock.expect(exitLatch.await(0, TimeUnit.MILLISECONDS)).andReturn(false); - prevTs = time.hiResClockMs(); - retryWithToleranceOperator.backoff(2, 5000); - assertEquals(600, time.hiResClockMs() - prevTs); + replay(exitLatch); - prevTs = time.hiResClockMs(); + retryWithToleranceOperator.backoff(1, 5000); + retryWithToleranceOperator.backoff(2, 5000); retryWithToleranceOperator.backoff(3, 5000); - assertEquals(1200, time.hiResClockMs() - prevTs); - - prevTs = time.hiResClockMs(); retryWithToleranceOperator.backoff(4, 5000); - assertEquals(2400, time.hiResClockMs() - prevTs); - - prevTs = time.hiResClockMs(); retryWithToleranceOperator.backoff(5, 5000); - assertEquals(500, time.hiResClockMs() - prevTs); - - prevTs = time.hiResClockMs(); retryWithToleranceOperator.backoff(6, 5000); - assertEquals(0, time.hiResClockMs() - prevTs); PowerMock.verifyAll(); } + @Test + public void testBackoffMinDurationZero() throws Exception { + MockTime time = new MockTime(10, 0, 0); + CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(335, 5000, NONE, time, new ProcessingContext(), exitLatch); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("Test")).anyTimes(); + EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(300); + return false; + }); + // Due to the auto tick setup on the MockTime instance, each call to time.milliseconds() will + // advance the clock by 10ms. The error retry timeout is configured to 335 ms so that after + // the first retry which backs off for 300 ms, just enough time will pass between the + // deadline check (when the current time in milliseconds will be 330) and the second backoff call + // so that in the backoff method, the current time (now 345) will have exceeded the deadline. + // This will cause the backoff delay to be 0ms (min possible delay since it shouldn't be negative) + EasyMock.expect(exitLatch.await(0, TimeUnit.MILLISECONDS)).andReturn(false); + replay(mockOperation, exitLatch); + + retryWithToleranceOperator.execAndRetry(mockOperation); + assertTrue(retryWithToleranceOperator.failed()); + PowerMock.verifyAll(); + } Review Comment: This doesn't need to be an entirely new test case; we can just tweak the existing `testBackoffLimit` case with something like this (the first and last lines are unmodified from that test case): ```java retryWithToleranceOperator.backoff(5, 5000); // Simulate a small delay between calculating the deadline, and backing off time.sleep(1); // We may try to begin backing off after the deadline has already passed; make sure // that we don't wait with a negative timeout retryWithToleranceOperator.backoff(6, 5000); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org