pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r480989742



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                        
request.getOnCompletionFuture()),
                                                timer);
 
-                       final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture
-                                       .thenCompose(this::snapshotMasterState);
-
                        final CompletableFuture<?> 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                                        .thenComposeAsync((pendingCheckpoint) ->
                                                        
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
+                       // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+                       // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+                       // ExternallyInducedSource is used.
+                       final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
+                                       .thenComposeAsync(ignored -> {
+                                               PendingCheckpoint checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       > the behavior is guaranteed by CompletableFuture, the assertion here 
would essentially be verifying CompletableFuture,
   
   Not exactly. It would be verifying that you chained a couple of futures and 
callbacks correctly. That the callback `foo()` is using `future1` result and is 
triggered once `future2` completes, and that `future1` and `future2` are 
chained (or am I still mis understanding this code?). Java library doesn't 
guarantee you that, but your code that is chaining the futures does. Which is 
outside of the `foo()`'s control, so from `foo()`s perspective, that's an 
external assumption, and falls under:
   
   > ensure the interface contract with users are not broken.
   
   Where "users" are function's callers.
   
   And as I wrote before. If something violates this assumption, and even if 
some unit test fail, it's a bit easier to understand a `checkState` compared to 
`NPE`. Note performance overhead of one if check doesn't matter here at all. 
Also it's harder of `checkState` to become outdated and misleading over time.
   
   If you have so strong feelings about, put a comment, but I do not see any 
drawback of replacing a comment with a `checkState` with the same comment but 
as a message here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
+
+                       coord.addMasterHook(new 
MasterTriggerRestoreHook<Integer>() {
+                               @Override
+                               public String getIdentifier() {
+                                       return "anything";
+                               }
+
+                               @Override
+                               public CompletableFuture<Integer> 
triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws 
Exception {
+                                       // Acknowledge the checkpoint in the 
master hooks so the task snapshots complete before
+                                       // the master state snapshot completes.
+                                       checkpointIdRef.set(checkpointId);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+                                               jid, attemptID1, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates1);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+                                               jid, attemptID2, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates2);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
TASK_MANAGER_LOCATION_INFO);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, 
TASK_MANAGER_LOCATION_INFO);
+                                       return null;
+                               }
+
+                               @Override
+                               public void restoreCheckpoint(long 
checkpointId, Integer checkpointData) throws Exception {
+
+                               }
+
+                               @Override
+                               public SimpleVersionedSerializer<Integer> 
createCheckpointDataSerializer() {
+                                       return new 
SimpleVersionedSerializer<Integer>() {
+                                               @Override
+                                               public int getVersion() {
+                                                       return 0;
+                                               }
+
+                                               @Override
+                                               public byte[] serialize(Integer 
obj) throws IOException {
+                                                       return new byte[0];
+                                               }
+
+                                               @Override
+                                               public Integer deserialize(int 
version, byte[] serialized) throws IOException {
+                                                       return null;
+                                               }
+                                       };
+                               }
+                       });
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, 
manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+                       // trigger the first checkpoint. this should succeed
+                       final CompletableFuture<CompletedCheckpoint> 
checkpointFuture = coord.triggerCheckpoint(false);
+                       manuallyTriggeredScheduledExecutor.triggerAll();
+                       
assertFalse(checkpointFuture.isCompletedExceptionally());

Review comment:
       Maybe add a helper method in 
`FutureUtils#throwIfCompletedExceptionally(checkpointFuture)`?  
   
   (I've found this when I was trying out the test coverage of this unit test, 
and `assertFalse` is not very helpful here)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();

Review comment:
       It's not a big issue, but the "convention" dates to 2015 and we are 
trying to change it recently. They are intuitive if you are reading or writing 
whole test, but not necessarily when reading a call like
   ```
   getCheckpointCoordinator(jid, vertex1, vertex2);
   ```
   newer code tries to get rid of the abbreviations here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);

Review comment:
       If you prefer, removing the mockito in all of those unit tests would be 
really great and appreciated. But if we don't have time to do it, please do not 
add instant technological debt :( Like in many other places, we are trying to 
incrementally improve code quality and this goes in the opposite direction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to