reswqa commented on code in PR #21368: URL: https://github.com/apache/flink/pull/21368#discussion_r1031507436
########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java: ########## @@ -55,7 +55,7 @@ public void testNoCancelTwice() throws Exception { threads[i].join(); } - assertEquals(1, counter.get()); + assertThat(counter.get()).isOne(); Review Comment: ```suggestion assertThat(counter).hasValue(1); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java: ########## @@ -135,145 +144,196 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception { worker.processAllRequests(); ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42); - assertTrue(result42.isDone()); - try { - result42.getInputChannelStateHandles().get(); - fail("The result should have failed."); - } catch (Throwable throwable) { - assertTrue(findThrowable(throwable, TestException.class).isPresent()); - } + assertThat(result42.isDone()).isTrue(); + assertThatThrownBy(() -> result42.getInputChannelStateHandles().get()) + .as("The result should have failed.") + .hasCauseInstanceOf(TestException.class); ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43); - assertFalse(result43.isDone()); + assertThat(result43.isDone()).isFalse(); }); } - @Test(expected = TestException.class) - public void testBuffersRecycledOnError() throws Exception { - unwrappingError( - TestException.class, - () -> { - NetworkBuffer buffer = getBuffer(); - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) { - writer.open(); - callAddInputData(writer, buffer); - } finally { - assertTrue(buffer.isRecycled()); - } - }); + @Test + void testBuffersRecycledOnError() { + assertThatThrownBy( + () -> + unwrappingError( + TestException.class, + () -> { + NetworkBuffer buffer = getBuffer(); + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + new ConcurrentHashMap<>(), + failingWorker(), + 5)) { + writer.open(); + callAddInputData(writer, buffer); + } finally { + assertThat(buffer.isRecycled()).isTrue(); + } + })) + .isInstanceOf(TestException.class); } @Test - public void testBuffersRecycledOnClose() throws Exception { + void testBuffersRecycledOnClose() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker( writer -> { callStart(writer); callAddInputData(writer, buffer); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isRecycled()).isFalse(); }); - assertTrue(buffer.isRecycled()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoAddDataAfterFinished() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - callStart(writer); - callFinish(writer); - callAddInputData(writer); - })); - } - - @Test(expected = IllegalArgumentException.class) - public void testAddDataNotStarted() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData)); - } - - @Test(expected = IllegalArgumentException.class) - public void testFinishNotStarted() throws Exception { - unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish)); - } - - @Test(expected = IllegalArgumentException.class) - public void testRethrowOnClose() throws Exception { - unwrappingError( - IllegalArgumentException.class, - () -> - runWithSyncWorker( - writer -> { - try { - callFinish(writer); - } catch (IllegalArgumentException e) { - // ignore here - should rethrow in close - } - })); - } - - @Test(expected = TestException.class) - public void testRethrowOnNextCall() throws Exception { + assertThat(buffer.isRecycled()).isTrue(); + } + + @Test + void testNoAddDataAfterFinished() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalArgumentException.class, + () -> + runWithSyncWorker( + writer -> { + callStart(writer); + callFinish(writer); + callAddInputData(writer); + }))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAddDataNotStarted() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalArgumentException.class, + () -> + runWithSyncWorker( + (Consumer<ChannelStateWriter>) + this::callAddInputData))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testFinishNotStarted() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalArgumentException.class, + () -> runWithSyncWorker(this::callFinish))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnClose() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalArgumentException.class, + () -> + runWithSyncWorker( + writer -> { + try { + callFinish(writer); + } catch (IllegalArgumentException e) { + // ignore here - should rethrow in + // close + } + }))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testRethrowOnNextCall() { SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor(); ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5); - writer.open(); - worker.setThrown(new TestException()); - unwrappingError(TestException.class, () -> callStart(writer)); + assertThatThrownBy( + () -> { + writer.open(); + worker.setThrown(new TestException()); + unwrappingError(TestException.class, () -> callStart(writer)); + }) + .isInstanceOf(TestException.class); } - @Test(expected = IllegalStateException.class) - public void testLimit() throws IOException { + @Test + void testLimit() { int maxCheckpoints = 3; - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl( - TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) { - writer.open(); - for (int i = 0; i < maxCheckpoints; i++) { - writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - } - writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); - } + assertThatThrownBy( + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory(), + maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { + writer.start( + i, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + writer.start( + maxCheckpoints, + CheckpointOptions.forCheckpointWithDefaultLocation()); + } + }) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testStartNotOpened() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - try (ChannelStateWriterImpl writer = - new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) { - callStart(writer); - } - }); + @Test + void testStartNotOpened() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalStateException.class, + () -> { + try (ChannelStateWriterImpl writer = + new ChannelStateWriterImpl( + TASK_NAME, + 0, + getStreamFactoryFactory())) { + callStart(writer); + } + })) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) - public void testNoStartAfterClose() throws Exception { - unwrappingError( - IllegalStateException.class, - () -> { - ChannelStateWriterImpl writer = openWriter(); - writer.close(); - writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); - }); + @Test + void testNoStartAfterClose() { + assertThatThrownBy( + () -> + unwrappingError( + IllegalStateException.class, + () -> { + ChannelStateWriterImpl writer = openWriter(); + writer.close(); + writer.start( + 42, + CheckpointOptions + .forCheckpointWithDefaultLocation()); + })) + .isInstanceOf(IllegalStateException.class); Review Comment: ```suggestion Assertions.assertThatThrownBy( () -> { ChannelStateWriterImpl writer = openWriter(); writer.close(); writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation()); }) .hasCauseInstanceOf(IllegalStateException.class); ``` Maybe we can using `hasCauseInstanceOf` directly instead of `unwrappingError`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org