zhijiangW commented on a change in pull request #11515: [FLINK-16744][task] 
implement channel state persistence for unaligned checkpoints
URL: https://github.com/apache/flink/pull/11515#discussion_r401353051
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -772,38 +777,18 @@ private boolean performCheckpoint(
                LOG.debug("Starting checkpoint ({}) {} on task {}",
                        checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());
 
-               final long checkpointId = checkpointMetaData.getCheckpointId();
-
                if (isRunning) {
                        actionExecutor.runThrowing(() -> {
 
                                if 
(checkpointOptions.getCheckpointType().isSynchronous()) {
-                                       setSynchronousSavepointId(checkpointId);
+                                       
setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
 
                                        if (advanceToEndOfTime) {
                                                advanceToEndOfEventTime();
                                        }
                                }
 
-                               // All of the following steps happen as an 
atomic step from the perspective of barriers and
-                               // records/watermarks/timers/callbacks.
-                               // We generally try to emit the checkpoint 
barrier as soon as possible to not affect downstream
-                               // checkpoint alignments
-
-                               // Step (1): Prepare the checkpoint, allow 
operators to do some pre-barrier work.
-                               //           The pre-barrier work should be 
nothing or minimal in the common case.
-                               
operatorChain.prepareSnapshotPreBarrier(checkpointId);
-
-                               // Step (2): Send the checkpoint barrier 
downstream
-                               operatorChain.broadcastCheckpointBarrier(
-                                               checkpointId,
-                                               
checkpointMetaData.getTimestamp(),
-                                               checkpointOptions);
-
-                               // Step (3): Take the state snapshot. This 
should be largely asynchronous, to not
-                               //           impact progress of the streaming 
topology
-                               checkpointState(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
-
+                               
subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, 
checkpointOptions, checkpointMetrics, operatorChain, this::isCanceled);
 
 Review comment:
   nit: split the arguments into every line, seem too long.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to