pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r471297751



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
+
+                       coord.addMasterHook(new 
MasterTriggerRestoreHook<Integer>() {
+                               @Override
+                               public String getIdentifier() {
+                                       return "anything";
+                               }
+
+                               @Override
+                               public CompletableFuture<Integer> 
triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws 
Exception {
+                                       // Acknowledge the checkpoint in the 
master hooks so the task snapshots complete before
+                                       // the master state snapshot completes.
+                                       checkpointIdRef.set(checkpointId);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+                                               jid, attemptID1, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates1);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+                                               jid, attemptID2, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates2);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
TASK_MANAGER_LOCATION_INFO);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, 
TASK_MANAGER_LOCATION_INFO);
+                                       return null;
+                               }
+
+                               @Override
+                               public void restoreCheckpoint(long 
checkpointId, Integer checkpointData) throws Exception {
+
+                               }
+
+                               @Override
+                               public SimpleVersionedSerializer<Integer> 
createCheckpointDataSerializer() {
+                                       return new 
SimpleVersionedSerializer<Integer>() {
+                                               @Override
+                                               public int getVersion() {
+                                                       return 0;
+                                               }
+
+                                               @Override
+                                               public byte[] serialize(Integer 
obj) throws IOException {
+                                                       return new byte[0];
+                                               }
+
+                                               @Override
+                                               public Integer deserialize(int 
version, byte[] serialized) throws IOException {
+                                                       return null;
+                                               }
+                                       };
+                               }
+                       });
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, 
manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+                       // trigger the first checkpoint. this should succeed
+                       final CompletableFuture<CompletedCheckpoint> 
checkpointFuture = coord.triggerCheckpoint(false);
+                       manuallyTriggeredScheduledExecutor.triggerAll();
+                       
assertFalse(checkpointFuture.isCompletedExceptionally());

Review comment:
       `checkpointFuture.get()` instead would provide better error message 
(including the stack trace why has it failed) in case of a failure.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);

Review comment:
       Why do you need to use mockito in this test (it's against [our coding 
style](https://flink.apache.org/contributing/code-style-and-quality-common.html#design-for-testability))?
 Both `TaskStateSnapshot` and `OperatorSubtaskState` looks easy to be either 
used as it is or to implement a proper mock.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
+
+                       coord.addMasterHook(new 
MasterTriggerRestoreHook<Integer>() {
+                               @Override
+                               public String getIdentifier() {
+                                       return "anything";
+                               }
+
+                               @Override

Review comment:
       nit: `@Nullable`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                        
request.getOnCompletionFuture()),
                                                timer);
 
-                       final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture
-                                       .thenCompose(this::snapshotMasterState);
-
                        final CompletableFuture<?> 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                                        .thenComposeAsync((pendingCheckpoint) ->
                                                        
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
+                       // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+                       // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+                       // ExternallyInducedSource is used.
+                       final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
+                                       .thenComposeAsync(ignored -> {
+                                               PendingCheckpoint checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       > The checkState() would never fail.
   
   That's the point of `checkState()`/`checkArgument()` calls. Assert things 
that shouldn't happen and should be considered a bug, by providing an earlier 
failure with better self documenting code (for example here, a message could 
include an explanation for example `"pendingCheckpointCompletableFuture should 
always completed successfully before completing 
coordinatorCheckpointsComplete"`, which would save next person from having the 
same issue as I had with understanding this part :) 
   
   Besides, diligent compiler should fail with compilation failure here, as 
`Future#getWithoutException` returns `@Nullable` value, which is ignored 
without the `checkState`/`checkNotNull` calls. 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();

Review comment:
       please expand the abbreviations in this test `jid` forced me to scroll 
up and down looking what does it mean. `coord` could also be changed to 
`checkpointCoordinator`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */

Review comment:
       > I end up with testing in one case for simplicity
   
   It looks like the first bug fix from
   ```
   final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture 
                                        .thenCompose(this::snapshotMasterState);
   ```
   to
   ```
                        final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
                                        .thenComposeAsync(ignored -> {
                                                PendingCheckpoint checkpoint =
                                                        
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
                                                return 
snapshotMasterState(checkpoint);
                                        }, timer);
   ```
   doesn't have a test coverage.
   
   Could you split the two bug fixes into two commits? Besides that having both 
fixes in one commit is confusing it's also pretending that both are covered via 
the same unit test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to