zhijiangW commented on a change in pull request #11515: [FLINK-16744][task] implement channel state persistence for unaligned checkpoints URL: https://github.com/apache/flink/pull/11515#discussion_r401353051
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -772,38 +777,18 @@ private boolean performCheckpoint( LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); - final long checkpointId = checkpointMetaData.getCheckpointId(); - if (isRunning) { actionExecutor.runThrowing(() -> { if (checkpointOptions.getCheckpointType().isSynchronous()) { - setSynchronousSavepointId(checkpointId); + setSynchronousSavepointId(checkpointMetaData.getCheckpointId()); if (advanceToEndOfTime) { advanceToEndOfEventTime(); } } - // All of the following steps happen as an atomic step from the perspective of barriers and - // records/watermarks/timers/callbacks. - // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream - // checkpoint alignments - - // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. - // The pre-barrier work should be nothing or minimal in the common case. - operatorChain.prepareSnapshotPreBarrier(checkpointId); - - // Step (2): Send the checkpoint barrier downstream - operatorChain.broadcastCheckpointBarrier( - checkpointId, - checkpointMetaData.getTimestamp(), - checkpointOptions); - - // Step (3): Take the state snapshot. This should be largely asynchronous, to not - // impact progress of the streaming topology - checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); - + subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics, operatorChain, this::isCanceled); Review comment: nit: split the arguments into every line, seem too long. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services