rkhachatryan commented on a change in pull request #15959:
URL: https://github.com/apache/flink/pull/15959#discussion_r636323656



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1599,6 +1603,33 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private Map<OperatorID, OperatorState> 
extractOperatorStates(CompletedCheckpoint checkpoint) {
+        Map<OperatorID, OperatorState> operatorStates = 
checkpoint.getOperatorStates();

Review comment:
       I think we should NOT modify the checkpoint object in memory.
   Otherwise, channel state will not be discarded later when the checkpoint is 
subsumed.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1599,6 +1603,33 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private Map<OperatorID, OperatorState> 
extractOperatorStates(CompletedCheckpoint checkpoint) {
+        Map<OperatorID, OperatorState> operatorStates = 
checkpoint.getOperatorStates();
+
+        if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) 
{

Review comment:
       If not configured, this field is initialized to 0, right?
   
   And here we use a simple equality check against checkpoint ID. This is not a 
problem currently, as all checkpoint counters start from 1. But it's an 
implementation detail (of counters) and I think it's too risky to rely on it.
   
   So I'd initialize it to -1 (maybe in configs).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1599,6 +1603,33 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private Map<OperatorID, OperatorState> 
extractOperatorStates(CompletedCheckpoint checkpoint) {
+        Map<OperatorID, OperatorState> operatorStates = 
checkpoint.getOperatorStates();
+
+        if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) 
{
+            // rewrite the operator state with empty in-flight data.
+            for (OperatorState operatorState : operatorStates.values()) {
+                for (Map.Entry<Integer, OperatorSubtaskState> 
subtaskStateEntry :
+                        operatorState.getSubtaskStates().entrySet()) {
+
+                    OperatorSubtaskState subtaskState = 
subtaskStateEntry.getValue();
+                    if (!subtaskState.getResultSubpartitionState().isEmpty()
+                            || !subtaskState.getInputChannelState().isEmpty()) 
{
+                        operatorState.putState(
+                                subtaskStateEntry.getKey(),

Review comment:
       I'm concerned about potential `ConcurrentModificationException` as we 
are changing the underlying map during the iteration.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
##########
@@ -196,4 +196,21 @@
                                     .text(
                                             "Forces unaligned checkpoints, 
particularly allowing them for iterative jobs.")
                                     .build());
+
+    public static final ConfigOption<Long> 
CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            
ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data")

Review comment:
       It's not clear to me just from the name that it only applies to recovery.
   How about
   - `execution.checkpointing.recover-without-channel-state.checkpoint-id`?
   - 
`execution.checkpointing.unaligned.allow-unsafe-recovery-from-checkpoint-id`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to