reswqa commented on code in PR #21943:
URL: https://github.com/apache/flink/pull/21943#discussion_r1107241118
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link DefaultScheduler}. */
public class DefaultSchedulerTest extends TestLogger {
private static final int TIMEOUT_MS = 1000;
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @TempDir private Path TEMPORARY_FOLDER;
Review Comment:
```suggestion
@TempDir private static Path temporaryFolder;
```
Checkstyle also complained about the variable name her.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -211,8 +198,8 @@ public void setUp() throws Exception {
timeout = Time.seconds(60);
}
- @After
- public void tearDown() throws Exception {
+ @AfterEach
+ void tearDown() throws Exception {
Review Comment:
```suggestion
void tearDown() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -188,8 +175,8 @@ public class DefaultSchedulerTest extends TestLogger {
private Time timeout;
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() throws Exception {
Review Comment:
```suggestion
void setUp() {
```
No exception thrown in this method.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -248,20 +235,20 @@ public void testCorrectSettingOfInitializationTimestamp()
{
executionGraphInfo.getArchivedExecutionGraph();
// ensure all statuses are set in the ExecutionGraph
- assertThat(
-
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
greaterThan(0L));
-
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED),
greaterThan(0L));
-
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING),
greaterThan(0L));
+
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
+ .isGreaterThan(0L);
+
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L);
+
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L);
// ensure correct order
assertThat(
-
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)
- <=
archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED),
- Is.is(true));
+
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)
+ <=
archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED))
+ .isTrue();
Review Comment:
```suggestion
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
.isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED));
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -809,15 +796,15 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
checkpointCoordinator.triggerCheckpoint(false);
checkpointTriggeredLatch.await();
- assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(1)));
+
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(1);
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
- assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
+
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(0);
Review Comment:
```suggestion
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -285,17 +272,17 @@ public void
deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
new ExecutionVertexID(onlyJobVertexId, 3));
schedulingStrategy.schedule(verticesToSchedule);
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
Review Comment:
```suggestion
assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1132,21 +1192,23 @@ public void
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSl
ExecutionState.FAILED,
new RuntimeException(exceptionMessage)));
- assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
+
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0);
Review Comment:
```suggestion
assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1273,18 +1346,21 @@ public void testExceptionHistoryWithPreDeployFailure() {
final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
scheduler.getExceptionHistory();
- assertThat(
- actualExceptionHistory,
- IsIterableContainingInOrder.contains(
- ExceptionHistoryEntryMatcher.matchesFailure(
- failureInfo.getException(),
- failureInfo.getTimestamp(),
-
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
-
taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
+ assertThat(actualExceptionHistory)
+ .anySatisfy(
+ e ->
+ ExceptionHistoryEntryMatcher.matchesFailure(
Review Comment:
ditto.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1601,12 +1681,12 @@ public void
testLateRegisteredPartitionsWillBeReleased() {
// late registered partitions will not be tracked and will be released
shuffleMaster.completeAllPendingRegistrations();
- assertThat(trackedPartitions, hasSize(0));
- assertThat(shuffleMaster.getExternallyReleasedPartitions(),
hasSize(1));
+ assertThat(trackedPartitions).hasSize(0);
Review Comment:
```suggestion
assertThat(trackedPartitions).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -285,17 +272,17 @@ public void
deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
new ExecutionVertexID(onlyJobVertexId, 3));
schedulingStrategy.schedule(verticesToSchedule);
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
Review Comment:
```suggestion
assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();;
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1511,44 +1591,44 @@ public void
testDeploymentWaitForProducedPartitionRegistration() {
createSchedulerAndStartScheduling(jobGraph);
- assertThat(trackedPartitions, hasSize(0));
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+ assertThat(trackedPartitions).hasSize(0);
Review Comment:
```suggestion
assertThat(trackedPartitions).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link DefaultScheduler}. */
public class DefaultSchedulerTest extends TestLogger {
Review Comment:
```suggestion
class DefaultSchedulerTest {
```
Junit5 does not recommend using public test class.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -127,6 +127,10 @@ public enum TaskAcknowledgeResult {
private CheckpointException failureCause;
+ // Because onCompletionPromise is not required to synchronize with the
completion status of
+ // pendingCheckpoint, this flag is used to identify whether
pendingCheckpoint is completed.
+ private boolean isCompleted = false;
Review Comment:
I'm not sure if I missed something. I wonder why we need this field? It
seems that only the test code uses it.
If it is only for testing purpose, I am not inclined to introduce it and
`isCompleted()`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java:
##########
@@ -160,14 +148,13 @@ public void testSyncSavepointCannotBeSubsumed() throws
Exception {
CheckpointProperties forced =
CheckpointProperties.forSyncSavepoint(true, false,
SavepointFormatType.CANONICAL);
PendingCheckpoint pending = createPendingCheckpoint(forced);
- assertFalse(pending.canBeSubsumed());
+ assertThat(pending.canBeSubsumed()).isFalse();
+ ;
Review Comment:
This semicolon should not be needed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -481,7 +470,7 @@ private void testRestartVerticesOnFailuresInScheduling(
final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));
- assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(4));
+
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(4);
Review Comment:
```suggestion
assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(4);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1090,13 +1151,13 @@ public void
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
.iterator();
v1 = vertexIterator.next();
ArchivedExecutionVertex v2 = vertexIterator.next();
- assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
- assertThat(v2.getExecutionState(), is(ExecutionState.CANCELED));
- assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(),
hasSize(0));
+ assertThat(v1.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+ assertThat(v2.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
+
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0);
Review Comment:
```suggestion
assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -426,24 +413,26 @@ public void failJobIfNotEnoughResources() throws
Exception {
.getFailureInfo()
.getException()
.deserializeError(DefaultSchedulerTest.class.getClassLoader());
- assertTrue(findThrowable(failureCause,
NoResourceAvailableException.class).isPresent());
- assertTrue(
- findThrowableWithMessage(
- failureCause,
- "Could not allocate the required slot within
slot request timeout.")
- .isPresent());
- assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
+ assertThat(findThrowable(failureCause,
NoResourceAvailableException.class).isPresent())
+ .isTrue();
+ assertThat(
+ findThrowableWithMessage(
+ failureCause,
+ "Could not allocate the required slot
within slot request timeout.")
+ .isPresent())
+ .isTrue();
Review Comment:
```suggestion
assertThat(findThrowable(failureCause,
NoResourceAvailableException.class)).isPresent();
assertThat(
findThrowableWithMessage(
failureCause,
"Could not allocate the required slot within
slot request timeout."))
.isPresent();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws
Exception {
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
- assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+ assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ enableCheckpointing(jobGraph);
+
+ final CountDownLatch checkpointTriggeredLatch =
getCheckpointTriggeredLatch();
+
+ final CompletableFuture<JobStatus> counterShutdownFuture = new
CompletableFuture<>();
+ CheckpointIDCounter counter =
Review Comment:
Can be removed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws
Exception {
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
- assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+ assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
Review Comment:
I don't quite understand why this test case can guard the correctness of
your proposed fix approach. It seems that if the fix is removed, this test case
can also be passed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1048,12 +1109,12 @@ public void failureInfoIsSetAfterTaskFailure() {
final ErrorInfo failureInfo =
scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
- assertThat(failureInfo, is(notNullValue()));
- assertThat(failureInfo.getExceptionAsString(),
containsString(exceptionMessage));
+ assertThat(failureInfo).isNotNull();
+
assertThat(failureInfo.getExceptionAsString()).containsSubsequence(exceptionMessage);
Review Comment:
```suggestion
assertThat(failureInfo.getExceptionAsString()).contains(exceptionMessage);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link DefaultScheduler}. */
public class DefaultSchedulerTest extends TestLogger {
private static final int TIMEOUT_MS = 1000;
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ @TempDir private Path TEMPORARY_FOLDER;
Review Comment:
I have a close look at the code. This variable should have been safely
removed now.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws
Exception {
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
- assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+ assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
Review Comment:
The behavior we actually need to test should be:
1. Before `cleanupAfterCompletedCheckpoint` is completed, `checkpointFuture`
should be in an incomplete state. After the `cleanupAfterCompletedCheckpoint`
is completed, the `checkpointFuture` should be in the completed normally state.
2. If an exception occurs when the checkpoint is written to the
`CheckpointStateHandleStore`, the `checkpointFuture` should not be completed or
completed exceptionlly.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1373,6 +1373,7 @@ private void cleanupAfterCompletedCheckpoint(
completedCheckpoint.getTimestamp(),
extractIdIfDiscardedOnSubsumed(lastSubsumed));
}
+ pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
Review Comment:
If we encounter an exception in
`addCompletedCheckpointToStoreAndSubsumeOldest `, what should we do with
`pendingCheckpoint. getCompleteFuture()`?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1171,19 +1233,23 @@ public void testExceptionHistoryWithGlobalFailOver() {
final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
scheduler.getExceptionHistory();
- assertThat(actualExceptionHistory,
IsIterableWithSize.iterableWithSize(1));
+ assertThat(actualExceptionHistory).hasSize(1);
final RootExceptionHistoryEntry failure =
actualExceptionHistory.iterator().next();
assertThat(
- failure,
- ExceptionHistoryEntryMatcher.matchesGlobalFailure(
- expectedException,
-
scheduler.getExecutionGraph().getFailureInfo().getTimestamp()));
- assertThat(failure.getConcurrentExceptions(),
IsEmptyIterable.emptyIterable());
+ ExceptionHistoryEntryMatcher.matchesGlobalFailure(
Review Comment:
Emm, `ExceptionHistoryEntryMatcher` is a subclass of `hamcrest` matcher.
Maybe we should not use it now.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1229,20 +1295,28 @@ public void
testExceptionHistoryWithRestartableFailure() {
scheduler.getExceptionHistory();
// assert restarted attempt
+ assertThat(actualExceptionHistory).hasSize(2);
+ Iterator<RootExceptionHistoryEntry> iterator =
actualExceptionHistory.iterator();
+ RootExceptionHistoryEntry entry0 = iterator.next();
assertThat(
- actualExceptionHistory,
- IsIterableContainingInOrder.contains(
ExceptionHistoryEntryMatcher.matchesFailure(
Review Comment:
ditto.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -302,6 +306,10 @@ public CompletableFuture<CompletedCheckpoint>
getCompletionFuture() {
return onCompletionPromise;
}
+ public boolean isCompleted() {
Review Comment:
```suggestion
@VisibleForTesting
public boolean isCompleted() {
```
Marked as `VisibleForTesting` as only test method using it.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest(
waitForTermination(scheduler);
final JobStatus jobStatus = scheduler.requestJobStatus();
-
org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus);
-
org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1);
- org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0))
- .isEqualTo(jobGraph.getJobID());
+ assertThat(jobStatus).isEqualTo(expectedJobStatus);
+ assertThat(onCreatedJobList).hasSize(1);
+ assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID());
- org.assertj.core.api.Assertions.assertThat(onJobStatusList).hasSize(1);
- org.assertj.core.api.Assertions.assertThat(onJobStatusList.get(0))
- .isEqualTo(jobGraph.getJobID());
+ assertThat(onJobStatusList).hasSize(1);
+ assertThat(onJobStatusList.get(0)).isEqualTo(jobGraph.getJobID());
Review Comment:
ditto.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1511,44 +1591,44 @@ public void
testDeploymentWaitForProducedPartitionRegistration() {
createSchedulerAndStartScheduling(jobGraph);
- assertThat(trackedPartitions, hasSize(0));
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+ assertThat(trackedPartitions).hasSize(0);
+ assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
shuffleMaster.completeAllPendingRegistrations();
- assertThat(trackedPartitions, hasSize(1));
- assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2));
+ assertThat(trackedPartitions).hasSize(1);
+ assertThat(testExecutionOperations.getDeployedVertices()).hasSize(2);
}
@Test
- public void testFailedProducedPartitionRegistration() {
+ void testFailedProducedPartitionRegistration() {
shuffleMaster.setAutoCompleteRegistration(false);
final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
createSchedulerAndStartScheduling(jobGraph);
- assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0));
- assertThat(testExecutionOperations.getFailedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getCanceledVertices()).hasSize(0);
Review Comment:
```suggestion
assertThat(testExecutionOperations.getCanceledVertices()).isEmpty();
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest(
waitForTermination(scheduler);
final JobStatus jobStatus = scheduler.requestJobStatus();
-
org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus);
-
org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1);
- org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0))
- .isEqualTo(jobGraph.getJobID());
+ assertThat(jobStatus).isEqualTo(expectedJobStatus);
+ assertThat(onCreatedJobList).hasSize(1);
+ assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID());
Review Comment:
```suggestion
assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]