yashmayya commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r962110802
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -199,112 +202,156 @@ private RetryWithToleranceOperator setupExecutor() { @Test public void testExecAndHandleRetriableErrorOnce() throws Exception { - execAndHandleRetriableError(1, 300, new RetriableException("Test")); + execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), new RetriableException("Test"), true); } @Test public void testExecAndHandleRetriableErrorThrice() throws Exception { - execAndHandleRetriableError(3, 2100, new RetriableException("Test")); + execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), new RetriableException("Test"), true); + } + + @Test + public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws Exception { + execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true); + } + + @Test + public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Exception { + execAndHandleRetriableError(6000, 10, Arrays.asList(300L, 600L, 1200L, 2400L, 1500L), new RetriableException("Test"), false); } @Test public void testExecAndHandleNonRetriableErrorOnce() throws Exception { - execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable Test")); + execAndHandleNonRetriableError(1, new Exception("Non Retriable Test")); } @Test public void testExecAndHandleNonRetriableErrorThrice() throws Exception { - execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable Test")); + execAndHandleNonRetriableError(3, new Exception("Non Retriable Test")); } - public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { + @Test + public void testExitLatch() throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); + CountDownLatch exitLatch = EasyMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); + EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("test")).anyTimes(); + EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(300); + return false; + }); + EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(600); + return false; + }); + EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(1200); + return false; + }); + EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(2400); + retryWithToleranceOperator.triggerStop(); + return false; + }); - EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); - EasyMock.expect(mockOperation.call()).andReturn("Success"); - - replay(mockOperation); + // expect no more calls to exitLatch.await() after retryWithToleranceOperator.triggerStop() is called - String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); - assertFalse(retryWithToleranceOperator.failed()); - assertEquals("Success", result); - assertEquals(expectedWait, time.hiResClockMs()); + exitLatch.countDown(); + EasyMock.expectLastCall().once(); + replay(mockOperation, exitLatch); + retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); + assertTrue(retryWithToleranceOperator.failed()); + assertEquals(4500L, time.milliseconds()); PowerMock.verifyAll(); } - public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { + public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean successExpected) throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); + CountDownLatch exitLatch = EasyMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); EasyMock.expect(mockOperation.call()).andReturn("Success"); + for (Long expectedWait : expectedWaits) { + EasyMock.expect(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).andAnswer(() -> { + time.sleep(expectedWait); + return false; + }); + } - replay(mockOperation); + replay(mockOperation, exitLatch); String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); - assertTrue(retryWithToleranceOperator.failed()); - assertNull(result); - assertEquals(expectedWait, time.hiResClockMs()); + + if (successExpected) { + assertFalse(retryWithToleranceOperator.failed()); + assertEquals("Success", result); + } else { + assertTrue(retryWithToleranceOperator.failed()); + } PowerMock.verifyAll(); } - @Test - public void testCheckRetryLimit() { + public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, Exception e) throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(500, 100, NONE, time); - - time.setCurrentTimeMs(100); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + CountDownLatch exitLatch = EasyMock.createStrictMock(CountDownLatch.class); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); + retryWithToleranceOperator.metrics(errorHandlingMetrics); - time.setCurrentTimeMs(200); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); + EasyMock.expect(mockOperation.call()).andReturn("Success"); - time.setCurrentTimeMs(400); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + // expect no call to exitLatch.await() which is only called during the retry backoff - time.setCurrentTimeMs(499); - assertTrue(retryWithToleranceOperator.checkRetry(0)); + replay(mockOperation, exitLatch); - time.setCurrentTimeMs(501); - assertFalse(retryWithToleranceOperator.checkRetry(0)); + String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); + assertTrue(retryWithToleranceOperator.failed()); + assertNull(result); - time.setCurrentTimeMs(600); - assertFalse(retryWithToleranceOperator.checkRetry(0)); + PowerMock.verifyAll(); Review Comment: I've added this verification to `execAndHandleRetriableError` (in the success case). But here in `execAndHandleNonRetriableError`, no matter what the value for `numNonRetriableExceptionsThrown` is, `mockOperation.call()` will only be called once (since the exception isn't retriable) - so we can't verify that the expectation that `e` is thrown `numNonRetriableExceptionsThrown` times is met. I guess this would've been easier to test if this was migrated to Mockito? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org