JingGe commented on a change in pull request #18014: URL: https://github.com/apache/flink/pull/18014#discussion_r763391770
########## File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java ########## @@ -50,64 +48,71 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** A unit test class for {@link SourceReaderBase}. */ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> { - @Rule public ExpectedException expectedException = ExpectedException.none(); - @Test - public void testExceptionInSplitReader() throws Exception { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("One or more fetchers have encountered exception"); - final String errMsg = "Testing Exception"; - - FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(); - // We have to handle split changes first, otherwise fetch will not be called. - try (MockSourceReader reader = - new MockSourceReader( - elementsQueue, - () -> - new SplitReader<int[], MockSourceSplit>() { - @Override - public RecordsWithSplitIds<int[]> fetch() { - throw new RuntimeException(errMsg); - } - - @Override - public void handleSplitsChanges( - SplitsChange<MockSourceSplit> splitsChanges) {} - - @Override - public void wakeUp() {} - - @Override - public void close() {} - }, - getConfig(), - new TestingReaderContext())) { - ValidatingSourceOutput output = new ValidatingSourceOutput(); - reader.addSplits( - Collections.singletonList( - getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); - reader.notifyNoMoreSplits(); - // This is not a real infinite loop, it is supposed to throw exception after two polls. - while (true) { - InputStatus inputStatus = reader.pollNext(output); - assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); - // Add a sleep to avoid tight loop. - Thread.sleep(1); - } - } + void testExceptionInSplitReader() { + assertThrows( Review comment: Thanks @imaffe for your contribution! Please use AssertJ, because the community has made a consensus on using AssertJ as the assumption and assertion library. For more information about it please refer to [mailing list](https://lists.apache.org/thread/33t7hz8w873p1bc5msppk65792z08rgt). ########## File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java ########## @@ -50,64 +48,71 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** A unit test class for {@link SourceReaderBase}. */ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> { - @Rule public ExpectedException expectedException = ExpectedException.none(); - @Test - public void testExceptionInSplitReader() throws Exception { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("One or more fetchers have encountered exception"); - final String errMsg = "Testing Exception"; - - FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(); - // We have to handle split changes first, otherwise fetch will not be called. - try (MockSourceReader reader = - new MockSourceReader( - elementsQueue, - () -> - new SplitReader<int[], MockSourceSplit>() { - @Override - public RecordsWithSplitIds<int[]> fetch() { - throw new RuntimeException(errMsg); - } - - @Override - public void handleSplitsChanges( - SplitsChange<MockSourceSplit> splitsChanges) {} - - @Override - public void wakeUp() {} - - @Override - public void close() {} - }, - getConfig(), - new TestingReaderContext())) { - ValidatingSourceOutput output = new ValidatingSourceOutput(); - reader.addSplits( - Collections.singletonList( - getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); - reader.notifyNoMoreSplits(); - // This is not a real infinite loop, it is supposed to throw exception after two polls. - while (true) { - InputStatus inputStatus = reader.pollNext(output); - assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); - // Add a sleep to avoid tight loop. - Thread.sleep(1); - } - } + void testExceptionInSplitReader() { + assertThrows( + RuntimeException.class, + () -> { + final String errMsg = "Testing Exception"; + + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + // We have to handle split changes first, otherwise fetch will not be called. + try (MockSourceReader reader = + new MockSourceReader( + elementsQueue, + () -> + new SplitReader<int[], MockSourceSplit>() { + @Override + public RecordsWithSplitIds<int[]> fetch() { + throw new RuntimeException(errMsg); + } + + @Override + public void handleSplitsChanges( + SplitsChange<MockSourceSplit> + splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() {} + }, + getConfig(), + new TestingReaderContext())) { + ValidatingSourceOutput output = new ValidatingSourceOutput(); + reader.addSplits( + Collections.singletonList( + getSplit( + 0, + NUM_RECORDS_PER_SPLIT, + Boundedness.CONTINUOUS_UNBOUNDED))); + reader.notifyNoMoreSplits(); + // This is not a real infinite loop, it is supposed to throw exception after + // two polls. + while (true) { + InputStatus inputStatus = reader.pollNext(output); + assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); + // Add a sleep to avoid tight loop. + Thread.sleep(1); + } + } + }, + "One or more fetchers have encountered exception"); } @Test - public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception { Review comment: Why is "public" removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org