XComp commented on code in PR #21368:
URL: https://github.com/apache/flink/pull/21368#discussion_r1037196683


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {
     private static final long CHECKPOINT_ID = 42L;
     private static final String TASK_NAME = "test";
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddEventBuffer() throws Exception {
+    @Test
+    void testAddEventBuffer() throws Exception {
 
         NetworkBuffer dataBuf = getBuffer();
         NetworkBuffer eventBuf = getBuffer();
         eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-        try {
-            runWithSyncWorker(
-                    writer -> {
-                        callStart(writer);
-                        writer.addInputData(
-                                CHECKPOINT_ID,
-                                new InputChannelInfo(1, 1),
-                                1,
-                                ofElements(Buffer::recycleBuffer, eventBuf, 
dataBuf));
-                    });
-        } finally {
-            assertTrue(dataBuf.isRecycled());
-        }
+
+        runWithSyncWorker(
+                (writer, worker) -> {
+                    callStart(writer);
+                    callAddInputData(writer, dataBuf);

Review Comment:
   ```suggestion
                       callAddInputData(writer, eventBuf, dataBuf);
   ```
   Are you sure that we're still testing the same thing? The original code 
didn't call `worker.processAllRequests()` twice, as far as I can see. 
:thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {
     private static final long CHECKPOINT_ID = 42L;
     private static final String TASK_NAME = "test";
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddEventBuffer() throws Exception {
+    @Test
+    void testAddEventBuffer() throws Exception {
 
         NetworkBuffer dataBuf = getBuffer();
         NetworkBuffer eventBuf = getBuffer();
         eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-        try {
-            runWithSyncWorker(
-                    writer -> {
-                        callStart(writer);
-                        writer.addInputData(
-                                CHECKPOINT_ID,
-                                new InputChannelInfo(1, 1),
-                                1,
-                                ofElements(Buffer::recycleBuffer, eventBuf, 
dataBuf));
-                    });
-        } finally {
-            assertTrue(dataBuf.isRecycled());
-        }
+
+        runWithSyncWorker(

Review Comment:
   I also noticed the following: Refactoring this code in a way that we're 
using `assertThatThrownBy` is used changes the behavior of `runWithSyncWorker` 
slightly because of `worker.processAllRequests()` in 
[ChannelStateWriterImplTest:313](https://github.com/apache/flink/blob/772de6fc8698eea8b7b42c3a1de4282a61ba7e8f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java#L313).
 The old implementation didn't call `processAllRequests()` if an exception was 
thrown. The refactored version would actually cause this method to be called 
which might change the behavior. You might want to double-check whether that's 
something we want.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,100 +134,98 @@ public void testAbortOldAndStartNewCheckpoint() throws 
Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = 
writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, 
TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> 
result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = 
writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), 
failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() throws IOException {
+        NetworkBuffer buffer = getBuffer();
+        try (ChannelStateWriterImpl writer =
+                new ChannelStateWriterImpl(
+                        TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 
5)) {
+            writer.open();
+            assertThatThrownBy(() -> callAddInputData(writer, buffer))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasCauseInstanceOf(TestException.class);
+            assertThat(buffer.isRecycled()).isTrue();
+        }
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) 
this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> 
runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in 
close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {

Review Comment:
   This entire change block has examples where we are too broad in asserting 
the exception.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {

Review Comment:
   nit: I cannot comment on that line because it wasn't touched in this PR but 
we could also change the signature of 
`runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, 
SyncChannelStateWriteRequestExecutor, Exception> testFn)` to 
`runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, 
SyncChannelStateWriteRequestExecutor> testFn)` since the exception throwing is 
not utilized anymore when relying on a more specific `assertThatThrownBy` in 
each test.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -237,52 +234,41 @@ public void testLimit() throws IOException {
             for (int i = 0; i < maxCheckpoints; i++) {
                 writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
             }
-            writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+            assertThatThrownBy(
+                            () ->
+                                    writer.start(
+                                            maxCheckpoints,
+                                            
CheckpointOptions.forCheckpointWithDefaultLocation()))
+                    .isInstanceOf(IllegalStateException.class);
         }
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testStartNotOpened() throws IOException {
+        try (ChannelStateWriterImpl writer =
+                new ChannelStateWriterImpl(TASK_NAME, 0, 
getStreamFactoryFactory())) {
+            assertThatThrownBy(() -> callStart(writer))
+                    .hasCauseInstanceOf(IllegalStateException.class);
+        }
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAddDataAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    callStart(writer);
-                    writer.close();
-                    callAddInputData(writer);
-                });
+    @Test
+    void testNoStartAfterClose() throws IOException {
+        ChannelStateWriterImpl writer = openWriter();
+        writer.close();
+        assertThatThrownBy(
+                        () ->
+                                writer.start(
+                                        42, 
CheckpointOptions.forCheckpointWithDefaultLocation()))
+                .hasCauseInstanceOf(IllegalStateException.class);
     }
 
-    private static <T extends Throwable> void unwrappingError(
-            Class<T> clazz, RunnableWithException r) throws Exception {
-        try {
-            r.run();
-        } catch (Exception e) {
-            throw findThrowable(e, clazz).map(te -> (Exception) te).orElse(e);
-        }
+    @Test
+    void testNoAddDataAfterClose() throws IOException {
+        ChannelStateWriterImpl writer = openWriter();
+        callStart(writer);
+        writer.close();
+        assertThatThrownBy(() -> callAddInputData(writer));

Review Comment:
   We're missing the expected exception here



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -90,23 +86,26 @@ public void testAbort() throws Exception {
                     callAddInputData(writer, buffer);
                     callAbort(writer);
                     worker.processAllRequests();
-                    assertTrue(result.isDone());
-                    assertTrue(buffer.isRecycled());
+                    assertThat(result.isDone()).isTrue();
+                    assertThat(buffer.isRecycled()).isTrue();
                 });
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAbortClearsResults() throws Exception {
-        runWithSyncWorker(
-                (writer, worker) -> {
-                    callStart(writer);
-                    writer.abort(CHECKPOINT_ID, new TestException(), true);
-                    writer.getAndRemoveWriteResult(CHECKPOINT_ID);
-                });
+    @Test
+    void testAbortClearsResults() {
+        assertThatThrownBy(

Review Comment:
   the assert on the exception can be made more specific here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to