XComp commented on code in PR #21368:
URL: https://github.com/apache/flink/pull/21368#discussion_r1035860933


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java:
##########
@@ -42,56 +44,66 @@
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class ChannelStateWriteRequestDispatcherTest {
 
-    private final List<ChannelStateWriteRequest> requests;
-    private final Optional<Class<?>> expectedException;
-    public static final long CHECKPOINT_ID = 42L;
-
     @Parameters
-    public static Object[][] data() {
-
-        return new Object[][] {
-            // valid calls
-            new Object[] {empty(), asList(start(), completeIn(), 
completeOut())},
-            new Object[] {empty(), asList(start(), writeIn(), completeIn())},
-            new Object[] {empty(), asList(start(), writeOut(), completeOut())},
-            new Object[] {empty(), asList(start(), writeOutFuture(), 
completeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), writeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), 
writeOutFuture())},
-            new Object[] {empty(), asList(start(), completeOut(), writeIn())},
-            // invalid without start
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOut())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOutFuture())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeOut())},
-            // invalid double complete
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeIn(), completeIn())
-            },
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeOut(), completeOut())
-            },
-            // invalid write after complete
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), completeIn(), 
writeIn())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOut())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOutFuture())
-            },
-            // invalid double start
-            new Object[] {of(IllegalStateException.class), asList(start(), 
start())}
-        };
+    public static List<Object[]> data() {
+        ArrayList<Object[]> params = new ArrayList<>();

Review Comment:
   ```suggestion
           List<Object[]> params = new ArrayList<>();
   ```
   nit: it doesn't make a difference here but I just wanted to point out that 
it's good practice to rely on interfaces instead of classes for variable types.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java:
##########
@@ -34,61 +34,68 @@
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateChunkReader} test. */
-public class ChannelStateChunkReaderTest {
+class ChannelStateChunkReaderTest {
 
-    @Test(expected = TestException.class)
-    public void testBufferRecycledOnFailure() throws IOException, 
InterruptedException {
+    @Test
+    void testBufferRecycledOnFailure() {
         FailingChannelStateSerializer serializer = new 
FailingChannelStateSerializer();
         TestRecoveredChannelStateHandler handler = new 
TestRecoveredChannelStateHandler();
 
-        try (FSDataInputStream stream = getStream(serializer, 10)) {
-            new ChannelStateChunkReader(serializer)
-                    .readChunk(stream, serializer.getHeaderLength(), handler, 
"channelInfo", 0);
-        } finally {
-            checkState(serializer.failed);
-            checkState(!handler.requestedBuffers.isEmpty());
-            assertTrue(
-                    handler.requestedBuffers.stream()
-                            .allMatch(TestChannelStateByteBuffer::isRecycled));
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (FSDataInputStream stream = 
getStream(serializer, 10)) {
+                                new ChannelStateChunkReader(serializer)
+                                        .readChunk(
+                                                stream,
+                                                serializer.getHeaderLength(),
+                                                handler,
+                                                "channelInfo",
+                                                0);
+                            } finally {
+                                assertThat(serializer.failed).isTrue();
+                                assertThat(handler.requestedBuffers)
+                                        .isNotEmpty()
+                                        
.allMatch(TestChannelStateByteBuffer::isRecycled);
+                            }
+                        })
+                .isInstanceOf(TestException.class);

Review Comment:
   ```suggestion
           try (FSDataInputStream stream = getStream(serializer, 10)) {
               assertThatThrownBy(
                               () ->
                                       new ChannelStateChunkReader(serializer)
                                               .readChunk(
                                                       stream,
                                                       
serializer.getHeaderLength(),
                                                       handler,
                                                       "channelInfo",
                                                       0))
                       .isInstanceOf(TestException.class);
               assertThat(serializer.failed).isTrue();
               assertThat(handler.requestedBuffers)
                       .isNotEmpty()
                       .allMatch(TestChannelStateByteBuffer::isRecycled);
           }
   ```
   Putting the entire test in the entire test code in the `assertThatThrownBy` 
is probably not what we want. The assert's callback should be as small as 
possible to avoid asserting other code locations by accident.
   
   `IOException` can be added to the test's method signature again since it's 
not expected in this test setup and should make the test fail



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            callStart(writer);
+                                            callFinish(writer);
+                                            callAddInputData(writer);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) 
this::callAddInputData))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(() -> runWithSyncWorker(this::callFinish))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            try {
+                                                callFinish(writer);
+                                            } catch (IllegalArgumentException 
e) {
+                                                // ignore here - should 
rethrow in
+                                                // close
+                                            }
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new 
SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new 
ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            callStart(writer);
+                        })
+                .hasCauseInstanceOf(TestException.class);

Review Comment:
   ```suggestion
           writer.open();
           worker.setThrown(new TestException());
           assertThatThrownBy(() -> 
callStart(writer)).hasCauseInstanceOf(TestException.class);
   ```
   More-specific `assertThatThrownBy` help here as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java:
##########
@@ -42,56 +44,66 @@
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class ChannelStateWriteRequestDispatcherTest {
 
-    private final List<ChannelStateWriteRequest> requests;
-    private final Optional<Class<?>> expectedException;
-    public static final long CHECKPOINT_ID = 42L;
-
     @Parameters
-    public static Object[][] data() {
-
-        return new Object[][] {
-            // valid calls
-            new Object[] {empty(), asList(start(), completeIn(), 
completeOut())},
-            new Object[] {empty(), asList(start(), writeIn(), completeIn())},
-            new Object[] {empty(), asList(start(), writeOut(), completeOut())},
-            new Object[] {empty(), asList(start(), writeOutFuture(), 
completeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), writeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), 
writeOutFuture())},
-            new Object[] {empty(), asList(start(), completeOut(), writeIn())},
-            // invalid without start
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOut())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOutFuture())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeOut())},
-            // invalid double complete
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeIn(), completeIn())
-            },
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeOut(), completeOut())
-            },
-            // invalid write after complete
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), completeIn(), 
writeIn())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOut())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOutFuture())
-            },
-            // invalid double start
-            new Object[] {of(IllegalStateException.class), asList(start(), 
start())}
-        };
+    public static List<Object[]> data() {
+        ArrayList<Object[]> params = new ArrayList<>();
+        // valid calls
+        params.add(new Object[] {empty(), asList(start(), completeIn(), 
completeOut())});
+        params.add(new Object[] {empty(), asList(start(), writeIn(), 
completeIn())});
+        params.add(new Object[] {empty(), asList(start(), writeOut(), 
completeOut())});
+        params.add(new Object[] {empty(), asList(start(), writeOutFuture(), 
completeOut())});
+        params.add(new Object[] {empty(), asList(start(), completeIn(), 
writeOut())});
+        params.add(new Object[] {empty(), asList(start(), completeIn(), 
writeOutFuture())});
+        params.add(new Object[] {empty(), asList(start(), completeOut(), 
writeIn())});
+        // invalid without start
+        params.add(new Object[] {of(IllegalArgumentException.class), 
singletonList(writeIn())});
+        params.add(new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOut())});
+        params.add(
+                new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOutFuture())});
+        params.add(new Object[] {of(IllegalArgumentException.class), 
singletonList(completeIn())});
+        params.add(new Object[] {of(IllegalArgumentException.class), 
singletonList(completeOut())});
+        // invalid double complete
+        params.add(
+                new Object[] {
+                    of(IllegalArgumentException.class), asList(start(), 
completeIn(), completeIn())
+                });
+        params.add(
+                new Object[] {
+                    of(IllegalArgumentException.class),
+                    asList(start(), completeOut(), completeOut())
+                });
+        // invalid write after complete
+        params.add(
+                new Object[] {
+                    of(IllegalStateException.class), asList(start(), 
completeIn(), writeIn())
+                });
+        params.add(
+                new Object[] {
+                    of(IllegalStateException.class), asList(start(), 
completeOut(), writeOut())
+                });
+        params.add(
+                new Object[] {
+                    of(IllegalStateException.class),
+                    asList(start(), completeOut(), writeOutFuture())
+                });
+        // invalid double start
+        params.add(new Object[] {of(IllegalStateException.class), 
asList(start(), start())});
+        return params;
     }
 
+    @Parameter public Optional<Class<Exception>> expectedException;
+
+    @Parameter(value = 1)
+    public List<ChannelStateWriteRequest> requests;
+
+    public static final long CHECKPOINT_ID = 42L;

Review Comment:
   ```suggestion
       private static final long CHECKPOINT_ID = 42L;
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java:
##########
@@ -42,56 +44,66 @@
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class ChannelStateWriteRequestDispatcherTest {
 
-    private final List<ChannelStateWriteRequest> requests;
-    private final Optional<Class<?>> expectedException;
-    public static final long CHECKPOINT_ID = 42L;
-
     @Parameters

Review Comment:
   ```suggestion
       @Parameters(name = "expectedException={0} requests={1}")
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java:
##########
@@ -42,56 +44,66 @@
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.completeOutput;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateWriteRequestDispatcherImpl} tests. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class ChannelStateWriteRequestDispatcherTest {
 
-    private final List<ChannelStateWriteRequest> requests;
-    private final Optional<Class<?>> expectedException;
-    public static final long CHECKPOINT_ID = 42L;
-
     @Parameters
-    public static Object[][] data() {
-
-        return new Object[][] {
-            // valid calls
-            new Object[] {empty(), asList(start(), completeIn(), 
completeOut())},
-            new Object[] {empty(), asList(start(), writeIn(), completeIn())},
-            new Object[] {empty(), asList(start(), writeOut(), completeOut())},
-            new Object[] {empty(), asList(start(), writeOutFuture(), 
completeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), writeOut())},
-            new Object[] {empty(), asList(start(), completeIn(), 
writeOutFuture())},
-            new Object[] {empty(), asList(start(), completeOut(), writeIn())},
-            // invalid without start
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOut())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOutFuture())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeIn())},
-            new Object[] {of(IllegalArgumentException.class), 
singletonList(completeOut())},
-            // invalid double complete
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeIn(), completeIn())
-            },
-            new Object[] {
-                of(IllegalArgumentException.class), asList(start(), 
completeOut(), completeOut())
-            },
-            // invalid write after complete
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), completeIn(), 
writeIn())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOut())
-            },
-            new Object[] {
-                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOutFuture())
-            },
-            // invalid double start
-            new Object[] {of(IllegalStateException.class), asList(start(), 
start())}
-        };
+    public static List<Object[]> data() {
+        ArrayList<Object[]> params = new ArrayList<>();

Review Comment:
   ...and using `Arrays.asList(...)` would make this collection instantiation 
less verbose in a sense that we would have to use a local variable and call 
`params.add` for each entry.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,57 @@
 
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {
     private static final long CHECKPOINT_ID = 42L;
     private static final String TASK_NAME = "test";
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddEventBuffer() throws Exception {
+    @Test
+    void testAddEventBuffer() {
 
         NetworkBuffer dataBuf = getBuffer();
         NetworkBuffer eventBuf = getBuffer();
         eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-        try {
-            runWithSyncWorker(
-                    writer -> {
-                        callStart(writer);
-                        writer.addInputData(
-                                CHECKPOINT_ID,
-                                new InputChannelInfo(1, 1),
-                                1,
-                                ofElements(Buffer::recycleBuffer, eventBuf, 
dataBuf));
-                    });
-        } finally {
-            assertTrue(dataBuf.isRecycled());
-        }
+        assertThatThrownBy(

Review Comment:
   we should make the `assertThatThrownBy` specific to the call that causes the 
issue.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);

Review Comment:
   ```suggestion
       void testBuffersRecycledOnError() throws IOException {
           NetworkBuffer buffer = getBuffer();
           try (ChannelStateWriterImpl writer =
                   new ChannelStateWriterImpl(
                           TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
               writer.open();
               assertThatThrownBy(() -> callAddInputData(writer, buffer))
                       .isInstanceOf(RuntimeException.class)
                       .hasCauseInstanceOf(TestException.class);
               assertThat(buffer.isRecycled()).isTrue();
           }
   ```
   We can assert on the exception here in a more-specific manner as well. This 
also improves the readability of the test.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java:
##########
@@ -65,49 +68,48 @@
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.IntStream.range;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link SequentialChannelStateReaderImpl} Test. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class SequentialChannelStateReaderImplTest {
 
-    @Parameterized.Parameters(
-            name =
-                    "{0}: stateParLevel={1}, statePartsPerChannel={2}, 
stateBytesPerPart={3},  parLevel={4}, bufferSize={5}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            {"NoStateAndNoChannels", 0, 0, 0, 0, 0},
-            {"NoState", 0, 10, 10, 10, 10},
-            {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10},
-            {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10},
-            {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20},
-        };
+    @Parameters

Review Comment:
   ```suggestion
       @Parameters(name =
                       "{0}: stateParLevel={1}, statePartsPerChannel={2}, 
stateBytesPerPart={3},  parLevel={4}, bufferSize={5}")
   ```
   The JUnit5 `@Parameters` annotation also supports custom display names



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            callStart(writer);
+                                            callFinish(writer);
+                                            callAddInputData(writer);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) 
this::callAddInputData))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(() -> runWithSyncWorker(this::callFinish))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            try {
+                                                callFinish(writer);
+                                            } catch (IllegalArgumentException 
e) {
+                                                // ignore here - should 
rethrow in
+                                                // close
+                                            }
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new 
SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new 
ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            callStart(writer);
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testLimit() throws IOException {
+    @Test
+    void testLimit() {
         int maxCheckpoints = 3;
-        try (ChannelStateWriterImpl writer =
-                new ChannelStateWriterImpl(
-                        TASK_NAME, 0, getStreamFactoryFactory(), 
maxCheckpoints)) {
-            writer.open();
-            for (int i = 0; i < maxCheckpoints; i++) {
-                writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-            }
-            writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            0,
+                                            getStreamFactoryFactory(),
+                                            maxCheckpoints)) {
+                                writer.open();
+                                for (int i = 0; i < maxCheckpoints; i++) {
+                                    writer.start(
+                                            i,
+                                            
CheckpointOptions.forCheckpointWithDefaultLocation());
+                                }
+                                writer.start(
+                                        maxCheckpoints,
+                                        
CheckpointOptions.forCheckpointWithDefaultLocation());
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testStartNotOpened() {
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME, 0, 
getStreamFactoryFactory())) {
+                                callStart(writer);
+                            }
+                        })
+                .hasCauseInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAddDataAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    callStart(writer);
-                    writer.close();
-                    callAddInputData(writer);
-                });
+    @Test
+    void testNoStartAfterClose() {
+        assertThatThrownBy(
+                        () -> {
+                            ChannelStateWriterImpl writer = openWriter();
+                            writer.close();
+                            writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+                        })
+                .hasCauseInstanceOf(IllegalStateException.class);

Review Comment:
   ```suggestion
           void testNoStartAfterClose() throws IOException {
           ChannelStateWriterImpl writer = openWriter();
           writer.close();
           assertThatThrownBy(
                           () ->
                                   writer.start(
                                           42, 
CheckpointOptions.forCheckpointWithDefaultLocation()))
                   .hasCauseInstanceOf(IllegalStateException.class);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java:
##########
@@ -65,49 +68,48 @@
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.IntStream.range;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link SequentialChannelStateReaderImpl} Test. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class SequentialChannelStateReaderImplTest {
 
-    @Parameterized.Parameters(
-            name =
-                    "{0}: stateParLevel={1}, statePartsPerChannel={2}, 
stateBytesPerPart={3},  parLevel={4}, bufferSize={5}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            {"NoStateAndNoChannels", 0, 0, 0, 0, 0},
-            {"NoState", 0, 10, 10, 10, 10},
-            {"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10},
-            {"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10},
-            {"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20},
-        };
+    @Parameters
+    public static List<Object[]> parameters() {
+        ArrayList<Object[]> params = new ArrayList<>();
+        params.add(new Object[] {"NoStateAndNoChannels", 0, 0, 0, 0, 0});
+        params.add(new Object[] {"NoState", 0, 10, 10, 10, 10});
+        params.add(new Object[] {"ReadPermutedStateWithEqualBuffer", 10, 10, 
10, 10, 10});
+        params.add(new Object[] {"ReadPermutedStateWithReducedBuffer", 10, 10, 
10, 20, 10});
+        params.add(new Object[] {"ReadPermutedStateWithIncreasedBuffer", 10, 
10, 10, 10, 20});
+        return params;

Review Comment:
   ```suggestion
           return Arrays.asList(
                   new Object[] {"NoStateAndNoChannels", 0, 0, 0, 0, 0},
                   new Object[] {"NoState", 0, 10, 10, 10, 10},
                   new Object[] {"ReadPermutedStateWithEqualBuffer", 10, 10, 
10, 10, 10},
                   new Object[] {"ReadPermutedStateWithReducedBuffer", 10, 10, 
10, 20, 10},
                   new Object[] {"ReadPermutedStateWithIncreasedBuffer", 10, 
10, 10, 10, 20});
   ```
   nit: you don't have to change this if you don't agree.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            callStart(writer);
+                                            callFinish(writer);
+                                            callAddInputData(writer);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) 
this::callAddInputData))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(() -> runWithSyncWorker(this::callFinish))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            try {
+                                                callFinish(writer);
+                                            } catch (IllegalArgumentException 
e) {
+                                                // ignore here - should 
rethrow in
+                                                // close
+                                            }
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new 
SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new 
ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            callStart(writer);
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testLimit() throws IOException {
+    @Test
+    void testLimit() {
         int maxCheckpoints = 3;
-        try (ChannelStateWriterImpl writer =
-                new ChannelStateWriterImpl(
-                        TASK_NAME, 0, getStreamFactoryFactory(), 
maxCheckpoints)) {
-            writer.open();
-            for (int i = 0; i < maxCheckpoints; i++) {
-                writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-            }
-            writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            0,
+                                            getStreamFactoryFactory(),
+                                            maxCheckpoints)) {
+                                writer.open();
+                                for (int i = 0; i < maxCheckpoints; i++) {
+                                    writer.start(
+                                            i,
+                                            
CheckpointOptions.forCheckpointWithDefaultLocation());
+                                }
+                                writer.start(
+                                        maxCheckpoints,
+                                        
CheckpointOptions.forCheckpointWithDefaultLocation());
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testStartNotOpened() {
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME, 0, 
getStreamFactoryFactory())) {
+                                callStart(writer);
+                            }
+                        })
+                .hasCauseInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAddDataAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    callStart(writer);
-                    writer.close();
-                    callAddInputData(writer);
-                });
+    @Test
+    void testNoStartAfterClose() {
+        assertThatThrownBy(
+                        () -> {
+                            ChannelStateWriterImpl writer = openWriter();
+                            writer.close();
+                            writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+                        })
+                .hasCauseInstanceOf(IllegalStateException.class);
     }
 
-    private static <T extends Throwable> void unwrappingError(
-            Class<T> clazz, RunnableWithException r) throws Exception {
-        try {
-            r.run();
-        } catch (Exception e) {
-            throw findThrowable(e, clazz).map(te -> (Exception) te).orElse(e);
-        }
+    @Test
+    void testNoAddDataAfterClose() {
+        assertThatThrownBy(
+                        () -> {
+                            ChannelStateWriterImpl writer = openWriter();
+                            callStart(writer);
+                            writer.close();
+                            callAddInputData(writer);
+                        })

Review Comment:
   ```suggestion
       void testNoAddDataAfterClose() throws IOException {
           ChannelStateWriterImpl writer = openWriter();
           callStart(writer);
           writer.close();
           assertThatThrownBy(() -> callAddInputData(writer))
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            callStart(writer);
+                                            callFinish(writer);
+                                            callAddInputData(writer);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) 
this::callAddInputData))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(() -> runWithSyncWorker(this::callFinish))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            try {
+                                                callFinish(writer);
+                                            } catch (IllegalArgumentException 
e) {
+                                                // ignore here - should 
rethrow in
+                                                // close
+                                            }
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new 
SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new 
ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            callStart(writer);
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testLimit() throws IOException {
+    @Test
+    void testLimit() {
         int maxCheckpoints = 3;
-        try (ChannelStateWriterImpl writer =
-                new ChannelStateWriterImpl(
-                        TASK_NAME, 0, getStreamFactoryFactory(), 
maxCheckpoints)) {
-            writer.open();
-            for (int i = 0; i < maxCheckpoints; i++) {
-                writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-            }
-            writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            0,
+                                            getStreamFactoryFactory(),
+                                            maxCheckpoints)) {
+                                writer.open();
+                                for (int i = 0; i < maxCheckpoints; i++) {
+                                    writer.start(
+                                            i,
+                                            
CheckpointOptions.forCheckpointWithDefaultLocation());
+                                }
+                                writer.start(
+                                        maxCheckpoints,
+                                        
CheckpointOptions.forCheckpointWithDefaultLocation());
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testStartNotOpened() {
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME, 0, 
getStreamFactoryFactory())) {
+                                callStart(writer);
+                            }
+                        })
+                .hasCauseInstanceOf(IllegalStateException.class);

Review Comment:
   ```suggestion
           void testStartNotOpened() throws IOException {
           try (ChannelStateWriterImpl writer =
                   new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
               assertThatThrownBy(() -> callStart(writer))
                       .hasCauseInstanceOf(IllegalStateException.class);
           }
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,154 +142,165 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () -> {
+                            NetworkBuffer buffer = getBuffer();
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            new ConcurrentHashMap<>(),
+                                            failingWorker(),
+                                            5)) {
+                                writer.open();
+                                callAddInputData(writer, buffer);
+                            } finally {
+                                assertThat(buffer.isRecycled()).isTrue();
+                            }
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            callStart(writer);
+                                            callFinish(writer);
+                                            callAddInputData(writer);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) 
this::callAddInputData))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(() -> runWithSyncWorker(this::callFinish))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                runWithSyncWorker(
+                                        writer -> {
+                                            try {
+                                                callFinish(writer);
+                                            } catch (IllegalArgumentException 
e) {
+                                                // ignore here - should 
rethrow in
+                                                // close
+                                            }
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new 
SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new 
ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            callStart(writer);
+                        })
+                .hasCauseInstanceOf(TestException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testLimit() throws IOException {
+    @Test
+    void testLimit() {
         int maxCheckpoints = 3;
-        try (ChannelStateWriterImpl writer =
-                new ChannelStateWriterImpl(
-                        TASK_NAME, 0, getStreamFactoryFactory(), 
maxCheckpoints)) {
-            writer.open();
-            for (int i = 0; i < maxCheckpoints; i++) {
-                writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-            }
-            writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            0,
+                                            getStreamFactoryFactory(),
+                                            maxCheckpoints)) {
+                                writer.open();
+                                for (int i = 0; i < maxCheckpoints; i++) {
+                                    writer.start(
+                                            i,
+                                            
CheckpointOptions.forCheckpointWithDefaultLocation());
+                                }
+                                writer.start(
+                                        maxCheckpoints,
+                                        
CheckpointOptions.forCheckpointWithDefaultLocation());
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class);

Review Comment:
   ```suggestion
           try (ChannelStateWriterImpl writer =
                   new ChannelStateWriterImpl(
                           TASK_NAME, 0, getStreamFactoryFactory(), 
maxCheckpoints)) {
               writer.open();
               for (int i = 0; i < maxCheckpoints; i++) {
                   writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
               }
   
               assertThatThrownBy(
                               () ->
                                       writer.start(
                                               maxCheckpoints,
                                               
CheckpointOptions.forCheckpointWithDefaultLocation()))
                       .isInstanceOf(IllegalStateException.class);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to