XComp commented on code in PR #21368: URL: https://github.com/apache/flink/pull/21368#discussion_r1035860933
########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java: ########## @@ -42,56 +44,66 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write; import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class ChannelStateWriteRequestDispatcherTest { - private final List<ChannelStateWriteRequest> requests; - private final Optional<Class<?>> expectedException; - public static final long CHECKPOINT_ID = 42L; - @Parameters - public static Object[][] data() { - - return new Object[][] { - // valid calls - new Object[] {empty(), asList(start(), completeIn(), completeOut())}, - new Object[] {empty(), asList(start(), writeIn(), completeIn())}, - new Object[] {empty(), asList(start(), writeOut(), completeOut())}, - new Object[] {empty(), asList(start(), writeOutFuture(), completeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOutFuture())}, - new Object[] {empty(), asList(start(), completeOut(), writeIn())}, - // invalid without start - new Object[] {of(IllegalArgumentException.class), singletonList(writeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOut())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOutFuture())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeOut())}, - // invalid double complete - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeIn(), completeIn()) - }, - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeOut(), completeOut()) - }, - // invalid write after complete - new Object[] { - of(IllegalStateException.class), asList(start(), completeIn(), writeIn()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOut()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOutFuture()) - }, - // invalid double start - new Object[] {of(IllegalStateException.class), asList(start(), start())} - }; + public static List<Object[]> data() { + ArrayList<Object[]> params = new ArrayList<>(); Review Comment: ```suggestion List<Object[]> params = new ArrayList<>(); ``` nit: it doesn't make a difference here but I just wanted to point out that it's good practice to rely on interfaces instead of classes for variable types. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java: ########## @@ -34,61 +34,68 @@ import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** {@link ChannelStateChunkReader} test. */ -public class ChannelStateChunkReaderTest { +class ChannelStateChunkReaderTest { - @Test(expected = TestException.class) - public void testBufferRecycledOnFailure() throws IOException, InterruptedException { + @Test + void testBufferRecycledOnFailure() { FailingChannelStateSerializer serializer = new FailingChannelStateSerializer(); TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler(); - try (FSDataInputStream stream = getStream(serializer, 10)) { - new ChannelStateChunkReader(serializer) - .readChunk(stream, serializer.getHeaderLength(), handler, "channelInfo", 0); - } finally { - checkState(serializer.failed); - checkState(!handler.requestedBuffers.isEmpty()); - assertTrue( - handler.requestedBuffers.stream() - .allMatch(TestChannelStateByteBuffer::isRecycled)); - } + assertThatThrownBy( + () -> { + try (FSDataInputStream stream = getStream(serializer, 10)) { + new ChannelStateChunkReader(serializer) + .readChunk( + stream, + serializer.getHeaderLength(), + handler, + "channelInfo", + 0); + } finally { + assertThat(serializer.failed).isTrue(); + assertThat(handler.requestedBuffers) + .isNotEmpty() + .allMatch(TestChannelStateByteBuffer::isRecycled); + } + }) + .isInstanceOf(TestException.class); Review Comment: ```suggestion try (FSDataInputStream stream = getStream(serializer, 10)) { assertThatThrownBy( () -> new ChannelStateChunkReader(serializer) .readChunk( stream, serializer.getHeaderLength(), handler, "channelInfo", 0)) .isInstanceOf(TestException.class); assertThat(serializer.failed).isTrue(); assertThat(handler.requestedBuffers) .isNotEmpty() .allMatch(TestChannelStateByteBuffer::isRecycled); } ``` Putting the entire test in the entire test code in the `assertThatThrownBy` is probably not what we want. The assert's callback should be as small as possible to avoid asserting other code locations by accident. `IOException` can be added to the test's method signature again since it's not expected in this test setup and should make the test fail ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) this::callAddInputData)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy(() -> runWithSyncWorker(this::callFinish)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + callStart(writer); + }) + .hasCauseInstanceOf(TestException.class); Review Comment: ```suggestion writer.open(); worker.setThrown(new TestException()); assertThatThrownBy(() -> callStart(writer)).hasCauseInstanceOf(TestException.class); ``` More-specific `assertThatThrownBy` help here as well. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java: ########## @@ -42,56 +44,66 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write; import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class ChannelStateWriteRequestDispatcherTest { - private final List<ChannelStateWriteRequest> requests; - private final Optional<Class<?>> expectedException; - public static final long CHECKPOINT_ID = 42L; - @Parameters - public static Object[][] data() { - - return new Object[][] { - // valid calls - new Object[] {empty(), asList(start(), completeIn(), completeOut())}, - new Object[] {empty(), asList(start(), writeIn(), completeIn())}, - new Object[] {empty(), asList(start(), writeOut(), completeOut())}, - new Object[] {empty(), asList(start(), writeOutFuture(), completeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOutFuture())}, - new Object[] {empty(), asList(start(), completeOut(), writeIn())}, - // invalid without start - new Object[] {of(IllegalArgumentException.class), singletonList(writeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOut())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOutFuture())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeOut())}, - // invalid double complete - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeIn(), completeIn()) - }, - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeOut(), completeOut()) - }, - // invalid write after complete - new Object[] { - of(IllegalStateException.class), asList(start(), completeIn(), writeIn()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOut()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOutFuture()) - }, - // invalid double start - new Object[] {of(IllegalStateException.class), asList(start(), start())} - }; + public static List<Object[]> data() { + ArrayList<Object[]> params = new ArrayList<>(); + // valid calls + params.add(new Object[] {empty(), asList(start(), completeIn(), completeOut())}); + params.add(new Object[] {empty(), asList(start(), writeIn(), completeIn())}); + params.add(new Object[] {empty(), asList(start(), writeOut(), completeOut())}); + params.add(new Object[] {empty(), asList(start(), writeOutFuture(), completeOut())}); + params.add(new Object[] {empty(), asList(start(), completeIn(), writeOut())}); + params.add(new Object[] {empty(), asList(start(), completeIn(), writeOutFuture())}); + params.add(new Object[] {empty(), asList(start(), completeOut(), writeIn())}); + // invalid without start + params.add(new Object[] {of(IllegalArgumentException.class), singletonList(writeIn())}); + params.add(new Object[] {of(IllegalArgumentException.class), singletonList(writeOut())}); + params.add( + new Object[] {of(IllegalArgumentException.class), singletonList(writeOutFuture())}); + params.add(new Object[] {of(IllegalArgumentException.class), singletonList(completeIn())}); + params.add(new Object[] {of(IllegalArgumentException.class), singletonList(completeOut())}); + // invalid double complete + params.add( + new Object[] { + of(IllegalArgumentException.class), asList(start(), completeIn(), completeIn()) + }); + params.add( + new Object[] { + of(IllegalArgumentException.class), + asList(start(), completeOut(), completeOut()) + }); + // invalid write after complete + params.add( + new Object[] { + of(IllegalStateException.class), asList(start(), completeIn(), writeIn()) + }); + params.add( + new Object[] { + of(IllegalStateException.class), asList(start(), completeOut(), writeOut()) + }); + params.add( + new Object[] { + of(IllegalStateException.class), + asList(start(), completeOut(), writeOutFuture()) + }); + // invalid double start + params.add(new Object[] {of(IllegalStateException.class), asList(start(), start())}); + return params; } + @Parameter public Optional<Class<Exception>> expectedException; + + @Parameter(value = 1) + public List<ChannelStateWriteRequest> requests; + + public static final long CHECKPOINT_ID = 42L; Review Comment: ```suggestion private static final long CHECKPOINT_ID = 42L; ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java: ########## @@ -42,56 +44,66 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write; import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class ChannelStateWriteRequestDispatcherTest { - private final List<ChannelStateWriteRequest> requests; - private final Optional<Class<?>> expectedException; - public static final long CHECKPOINT_ID = 42L; - @Parameters Review Comment: ```suggestion @Parameters(name = "expectedException={0} requests={1}") ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java: ########## @@ -42,56 +44,66 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write; import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class ChannelStateWriteRequestDispatcherTest { - private final List<ChannelStateWriteRequest> requests; - private final Optional<Class<?>> expectedException; - public static final long CHECKPOINT_ID = 42L; - @Parameters - public static Object[][] data() { - - return new Object[][] { - // valid calls - new Object[] {empty(), asList(start(), completeIn(), completeOut())}, - new Object[] {empty(), asList(start(), writeIn(), completeIn())}, - new Object[] {empty(), asList(start(), writeOut(), completeOut())}, - new Object[] {empty(), asList(start(), writeOutFuture(), completeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOut())}, - new Object[] {empty(), asList(start(), completeIn(), writeOutFuture())}, - new Object[] {empty(), asList(start(), completeOut(), writeIn())}, - // invalid without start - new Object[] {of(IllegalArgumentException.class), singletonList(writeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOut())}, - new Object[] {of(IllegalArgumentException.class), singletonList(writeOutFuture())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeIn())}, - new Object[] {of(IllegalArgumentException.class), singletonList(completeOut())}, - // invalid double complete - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeIn(), completeIn()) - }, - new Object[] { - of(IllegalArgumentException.class), asList(start(), completeOut(), completeOut()) - }, - // invalid write after complete - new Object[] { - of(IllegalStateException.class), asList(start(), completeIn(), writeIn()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOut()) - }, - new Object[] { - of(IllegalStateException.class), asList(start(), completeOut(), writeOutFuture()) - }, - // invalid double start - new Object[] {of(IllegalStateException.class), asList(start(), start())} - }; + public static List<Object[]> data() { + ArrayList<Object[]> params = new ArrayList<>(); Review Comment: ...and using `Arrays.asList(...)` would make this collection instantiation less verbose in a sense that we would have to use a local variable and call `params.add` for each entry. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -36,52 +35,57 @@ import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory; import static org.apache.flink.util.CloseableIterator.ofElements; -import static org.apache.flink.util.ExceptionUtils.findThrowable; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** {@link ChannelStateWriterImpl} lifecycle tests. */ -public class ChannelStateWriterImplTest { +class ChannelStateWriterImplTest { private static final long CHECKPOINT_ID = 42L; private static final String TASK_NAME = "test"; - @Test(expected = IllegalArgumentException.class) - public void testAddEventBuffer() throws Exception { + @Test + void testAddEventBuffer() { NetworkBuffer dataBuf = getBuffer(); NetworkBuffer eventBuf = getBuffer(); eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER); - try { - runWithSyncWorker( - writer -> { - callStart(writer); - writer.addInputData( - CHECKPOINT_ID, - new InputChannelInfo(1, 1), - 1, - ofElements(Buffer::recycleBuffer, eventBuf, dataBuf)); - }); - } finally { - assertTrue(dataBuf.isRecycled()); - } + assertThatThrownBy( Review Comment: we should make the `assertThatThrownBy` specific to the call that causes the issue. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); Review Comment: ```suggestion void testBuffersRecycledOnError() throws IOException { NetworkBuffer buffer = getBuffer(); try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl( TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { writer.open(); assertThatThrownBy(() -> callAddInputData(writer, buffer)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(TestException.class); assertThat(buffer.isRecycled()).isTrue(); } ``` We can assert on the exception here in a more-specific manner as well. This also improves the readability of the test. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java: ########## @@ -65,49 +68,48 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.IntStream.range; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** {@link SequentialChannelStateReaderImpl} Test. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class SequentialChannelStateReaderImplTest { - @Parameterized.Parameters( - name = - "{0}: stateParLevel={1}, statePartsPerChannel={2}, stateBytesPerPart={3}, parLevel={4}, bufferSize={5}") - public static Object[][] parameters() { - return new Object[][] { - {"NoStateAndNoChannels", 0, 0, 0, 0, 0}, - {"NoState", 0, 10, 10, 10, 10}, - {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10}, - {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10}, - {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20}, - }; + @Parameters Review Comment: ```suggestion @Parameters(name = "{0}: stateParLevel={1}, statePartsPerChannel={2}, stateBytesPerPart={3}, parLevel={4}, bufferSize={5}") ``` The JUnit5 `@Parameters` annotation also supports custom display names ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) this::callAddInputData)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy(() -> runWithSyncWorker(this::callFinish)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + callStart(writer); + }) + .hasCauseInstanceOf(TestException.class); } - @Test(expected = IllegalStateException.class) - public void testLimit() throws IOException { + @Test + void testLimit() { int maxCheckpoints = 3; - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { - writer.open(); - for (int i = 0; i < maxCheckpoints; i++) { - writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - } - writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); - } + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory(), + maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { + writer.start( + i, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + writer.start( + maxCheckpoints, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + }) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testStartNotOpened() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) { - callStart(writer); - } - }); - } - - @Test(expected = IllegalStateException.class) - public void testNoStartAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - writer.close(); - writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); - }); + @Test + void testStartNotOpened() { + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, 0, getStreamFactoryFactory())) { + callStart(writer); + } + }) + .hasCauseInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testNoAddDataAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - callStart(writer); - writer.close(); - callAddInputData(writer); - }); + @Test + void testNoStartAfterClose() { + assertThatThrownBy( + () -> { + ChannelStateWriterImpl writer = openWriter(); + writer.close(); + writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); + }) + .hasCauseInstanceOf(IllegalStateException.class); Review Comment: ```suggestion void testNoStartAfterClose() throws IOException { ChannelStateWriterImpl writer = openWriter(); writer.close(); assertThatThrownBy( () -> writer.start( 42, CheckpointOptions.forCheckpointWithDefaultLocation())) .hasCauseInstanceOf(IllegalStateException.class); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java: ########## @@ -65,49 +68,48 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.IntStream.range; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** {@link SequentialChannelStateReaderImpl} Test. */ -@RunWith(Parameterized.class) +@ExtendWith(ParameterizedTestExtension.class) public class SequentialChannelStateReaderImplTest { - @Parameterized.Parameters( - name = - "{0}: stateParLevel={1}, statePartsPerChannel={2}, stateBytesPerPart={3}, parLevel={4}, bufferSize={5}") - public static Object[][] parameters() { - return new Object[][] { - {"NoStateAndNoChannels", 0, 0, 0, 0, 0}, - {"NoState", 0, 10, 10, 10, 10}, - {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10}, - {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10}, - {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20}, - }; + @Parameters + public static List<Object[]> parameters() { + ArrayList<Object[]> params = new ArrayList<>(); + params.add(new Object[] {"NoStateAndNoChannels", 0, 0, 0, 0, 0}); + params.add(new Object[] {"NoState", 0, 10, 10, 10, 10}); + params.add(new Object[] {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10}); + params.add(new Object[] {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10}); + params.add(new Object[] {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20}); + return params; Review Comment: ```suggestion return Arrays.asList( new Object[] {"NoStateAndNoChannels", 0, 0, 0, 0, 0}, new Object[] {"NoState", 0, 10, 10, 10, 10}, new Object[] {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10}, new Object[] {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10}, new Object[] {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20}); ``` nit: you don't have to change this if you don't agree. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) this::callAddInputData)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy(() -> runWithSyncWorker(this::callFinish)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + callStart(writer); + }) + .hasCauseInstanceOf(TestException.class); } - @Test(expected = IllegalStateException.class) - public void testLimit() throws IOException { + @Test + void testLimit() { int maxCheckpoints = 3; - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { - writer.open(); - for (int i = 0; i < maxCheckpoints; i++) { - writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - } - writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); - } + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory(), + maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { + writer.start( + i, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + writer.start( + maxCheckpoints, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + }) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testStartNotOpened() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) { - callStart(writer); - } - }); - } - - @Test(expected = IllegalStateException.class) - public void testNoStartAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - writer.close(); - writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); - }); + @Test + void testStartNotOpened() { + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, 0, getStreamFactoryFactory())) { + callStart(writer); + } + }) + .hasCauseInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testNoAddDataAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - callStart(writer); - writer.close(); - callAddInputData(writer); - }); + @Test + void testNoStartAfterClose() { + assertThatThrownBy( + () -> { + ChannelStateWriterImpl writer = openWriter(); + writer.close(); + writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); + }) + .hasCauseInstanceOf(IllegalStateException.class); } - private static <T extends Throwable> void unwrappingError( - Class<T> clazz, RunnableWithException r) throws Exception { - try { - r.run(); - } catch (Exception e) { - throw findThrowable(e, clazz).map(te -> (Exception) te).orElse(e); - } + @Test + void testNoAddDataAfterClose() { + assertThatThrownBy( + () -> { + ChannelStateWriterImpl writer = openWriter(); + callStart(writer); + writer.close(); + callAddInputData(writer); + }) Review Comment: ```suggestion void testNoAddDataAfterClose() throws IOException { ChannelStateWriterImpl writer = openWriter(); callStart(writer); writer.close(); assertThatThrownBy(() -> callAddInputData(writer)) ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) this::callAddInputData)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy(() -> runWithSyncWorker(this::callFinish)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + callStart(writer); + }) + .hasCauseInstanceOf(TestException.class); } - @Test(expected = IllegalStateException.class) - public void testLimit() throws IOException { + @Test + void testLimit() { int maxCheckpoints = 3; - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { - writer.open(); - for (int i = 0; i < maxCheckpoints; i++) { - writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - } - writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); - } + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory(), + maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { + writer.start( + i, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + writer.start( + maxCheckpoints, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + }) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testStartNotOpened() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) { - callStart(writer); - } - }); - } - - @Test(expected = IllegalStateException.class) - public void testNoStartAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - writer.close(); - writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); - }); + @Test + void testStartNotOpened() { + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, 0, getStreamFactoryFactory())) { + callStart(writer); + } + }) + .hasCauseInstanceOf(IllegalStateException.class); Review Comment: ```suggestion void testStartNotOpened() throws IOException { try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) { assertThatThrownBy(() -> callStart(writer)) .hasCauseInstanceOf(IllegalStateException.class); } ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + }) + .hasCauseInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) this::callAddInputData)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy(() -> runWithSyncWorker(this::callFinish)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + })) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + callStart(writer); + }) + .hasCauseInstanceOf(TestException.class); } - @Test(expected = IllegalStateException.class) - public void testLimit() throws IOException { + @Test + void testLimit() { int maxCheckpoints = 3; - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { - writer.open(); - for (int i = 0; i < maxCheckpoints; i++) { - writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - } - writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); - } + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory(), + maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { + writer.start( + i, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + writer.start( + maxCheckpoints, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + }) + .isInstanceOf(IllegalStateException.class); Review Comment: ```suggestion try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl( TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { writer.open(); for (int i = 0; i < maxCheckpoints; i++) { writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); } assertThatThrownBy( () -> writer.start( maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation())) .isInstanceOf(IllegalStateException.class); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org