lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r944072971


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{
+        acknowledgeCloseGatewayFutureMap
+                .values()
+                .forEach(x -> x.completeExceptionally(new 
FlinkException(message)));
+        acknowledgeCloseGatewayFutureMap.clear();
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally(

Review Comment:
   This name is inconsistent with 
`abortAllPendingAcknowledgeCloseGatewayFuture(...)`.
   
   How about `abortAcknowledgeCloseGatewayFuture(...)`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{
+        acknowledgeCloseGatewayFutureMap
+                .values()
+                .forEach(x -> x.completeExceptionally(new 
FlinkException(message)));
+        acknowledgeCloseGatewayFutureMap.clear();
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally(
+            int subtask, String message, @Nullable Throwable reason) {
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            Exception exception = new FlinkException(message, reason);
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception);
+        }
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, 
long checkpointId) {

Review Comment:
   It seems simpler to name it `completeAcknowledgeCloseGatewayFuture()` for 
the same reason mentioned above.
   
   Whether this entry is kept in the map is an implementation detail. And it 
makes no sense to keep a completed future in the map.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -151,6 +166,10 @@ private OperatorCoordinatorHolder(
         this.operatorMaxParallelism = operatorMaxParallelism;
 
         this.subtaskGatewayMap = new HashMap<>();
+        this.acknowledgeCloseGatewayFutureMap = new HashMap<>();
+        this.currentPendingCheckpointId = NO_CHECKPOINT;

Review Comment:
   It seems better to use `this.currentPendingCheckpointId = 
OperatorCoordinator.NO_CHECKPOINT` directly to be consistent with existing code 
that reads `NO_CHECKPOINT`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -299,9 +364,78 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
     }
 
     private void checkpointCoordinatorInternal(
-            final long checkpointId, final CompletableFuture<byte[]> result) {
+            long checkpointId, CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
+        try {
+            if (currentPendingCheckpointId != NO_CHECKPOINT
+                    && currentPendingCheckpointId != checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Cannot checkpoint coordinator for checkpoint 
%d, "
+                                        + "since checkpoint %d has already 
started.",
+                                checkpointId, currentPendingCheckpointId));
+            }
+
+            if (latestAttemptedCheckpointId >= checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Regressing checkpoint IDs. Previous 
checkpointId = %d, new checkpointId = %d",
+                                latestAttemptedCheckpointId, checkpointId));
+            }
+
+            subtaskGatewayMap.forEach(
+                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
+
+            
Preconditions.checkState(acknowledgeCloseGatewayFutureMap.isEmpty());
+        } catch (Throwable t) {
+            result.completeExceptionally(t);
+            globalFailureHandler.handleGlobalFailure(t);
+            return;
+        }
+
+        currentPendingCheckpointId = checkpointId;
+        latestAttemptedCheckpointId = checkpointId;
+
+        for (int subtask : subtaskGatewayMap.keySet()) {
+            acknowledgeCloseGatewayFutureMap.put(subtask, new 
CompletableFuture<>());
+            final OperatorEvent closeGatewayEvent = new 
CloseGatewayEvent(checkpointId, subtask);
+            subtaskGatewayMap
+                    .get(subtask)
+                    .sendEventWithCallBackOnCompletion(
+                            closeGatewayEvent,
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    // If the close gateway event failed to 
reach the subtask for
+                                    // some reason, the coordinator would 
trigger a fail-over on
+                                    // the subtask if the subtask is still 
running. This behavior
+                                    // also guarantees that the coordinator 
won't receive more
+                                    // events from this subtask before the 
current checkpoint
+                                    // finishes, which is equivalent to 
receiving ACK from this
+                                    // subtask.
+                                    if (!(failure instanceof 
TaskNotRunningException)) {
+                                        boolean isFailoverTriggered =
+                                                subtaskGatewayMap
+                                                        .get(subtask)
+                                                        
.tryTriggerTaskFailover(

Review Comment:
   Do we need to preserve `Runnables.assertNoException(...)` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -299,9 +364,78 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
     }
 
     private void checkpointCoordinatorInternal(
-            final long checkpointId, final CompletableFuture<byte[]> result) {
+            long checkpointId, CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
+        try {
+            if (currentPendingCheckpointId != NO_CHECKPOINT
+                    && currentPendingCheckpointId != checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Cannot checkpoint coordinator for checkpoint 
%d, "
+                                        + "since checkpoint %d has already 
started.",
+                                checkpointId, currentPendingCheckpointId));
+            }
+
+            if (latestAttemptedCheckpointId >= checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Regressing checkpoint IDs. Previous 
checkpointId = %d, new checkpointId = %d",
+                                latestAttemptedCheckpointId, checkpointId));
+            }
+
+            subtaskGatewayMap.forEach(
+                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
+
+            
Preconditions.checkState(acknowledgeCloseGatewayFutureMap.isEmpty());
+        } catch (Throwable t) {
+            result.completeExceptionally(t);
+            globalFailureHandler.handleGlobalFailure(t);
+            return;
+        }
+
+        currentPendingCheckpointId = checkpointId;
+        latestAttemptedCheckpointId = checkpointId;
+
+        for (int subtask : subtaskGatewayMap.keySet()) {
+            acknowledgeCloseGatewayFutureMap.put(subtask, new 
CompletableFuture<>());
+            final OperatorEvent closeGatewayEvent = new 
CloseGatewayEvent(checkpointId, subtask);
+            subtaskGatewayMap
+                    .get(subtask)
+                    .sendEventWithCallBackOnCompletion(
+                            closeGatewayEvent,
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    // If the close gateway event failed to 
reach the subtask for
+                                    // some reason, the coordinator would 
trigger a fail-over on
+                                    // the subtask if the subtask is still 
running. This behavior
+                                    // also guarantees that the coordinator 
won't receive more
+                                    // events from this subtask before the 
current checkpoint
+                                    // finishes, which is equivalent to 
receiving ACK from this
+                                    // subtask.
+                                    if (!(failure instanceof 
TaskNotRunningException)) {
+                                        boolean isFailoverTriggered =
+                                                subtaskGatewayMap
+                                                        .get(subtask)
+                                                        
.tryTriggerTaskFailover(
+                                                                
closeGatewayEvent, failure);
+                                        if (isFailoverTriggered) {

Review Comment:
   Prior to this PR, the `result` is completed with exception if `failure != 
null`. This behavior seems pretty intuitive.
   
   Can you explain why we should complete `result` successfully if 
`isFailoverTriggered == false`? Would it be simpler to remove this check?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -220,12 +228,11 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) {
 
         // Gateways should always be marked and closed for a specific 
checkpoint before it can be
         // reopened for that checkpoint. If a gateway is to be opened for an 
unforeseen checkpoint,
-        // exceptions should be thrown.
+        // which might happen when the coordinator has been reset to a 
previous checkpoint, warn
+        // messages should be recorded.
         if (lastCheckpointId < checkpointId) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Gateway closed for different checkpoint: closed 
for = %d, expected = %d",
-                            currentCheckpointId, checkpointId));
+            LOG.warn("Trying to open gateway for unknown checkpoint: " + 
checkpointId);

Review Comment:
   Can you explain when we can have `lastCheckpointId < checkpointId`?
   
   Why this was not a problem before we update operator coordinator?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -75,6 +92,24 @@ void dispatchEventToHandlers(
         }
     }
 
+    void initializeOperatorEventGatewayState(
+            OperatorID operator, OperatorStateStore operatorStateStore) throws 
Exception {
+        getOperatorEventGateway(operator).initializeState(operatorStateStore);
+    }
+
+    void snapshotOperatorEventGatewayState(

Review Comment:
   snapshotOperatorEventGatewayState -> snapshotOperatorEventGateway



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{
+        acknowledgeCloseGatewayFutureMap
+                .values()
+                .forEach(x -> x.completeExceptionally(new 
FlinkException(message)));
+        acknowledgeCloseGatewayFutureMap.clear();
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally(
+            int subtask, String message, @Nullable Throwable reason) {
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            Exception exception = new FlinkException(message, reason);
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception);
+        }
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, 
long checkpointId) {
+        if (tryLogUnknownCheckpointId(checkpointId)) {
+            return;
+        }
+
+        // The coordinator holder may receive an acknowledgement event after 
the checkpoint
+        // corresponding to the event has been aborted, or even after a new 
checkpoint has started.
+        // The acknowledgement event should be ignored in these cases.
+        if (checkpointId < latestAttemptedCheckpointId) {
+            return;
         }
+
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get());
+        }
+    }
+
+    private boolean tryLogUnknownCheckpointId(long checkpointId) {

Review Comment:
   `tryLogUnknownCheckpointId` -> `IsUnknownCheckpointId`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{

Review Comment:
   How about `abortAcknowledgeCloseGatewayFutures()`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{
+        acknowledgeCloseGatewayFutureMap
+                .values()
+                .forEach(x -> x.completeExceptionally(new 
FlinkException(message)));
+        acknowledgeCloseGatewayFutureMap.clear();
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally(
+            int subtask, String message, @Nullable Throwable reason) {
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            Exception exception = new FlinkException(message, reason);
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception);
+        }
+    }
+
+    private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, 
long checkpointId) {
+        if (tryLogUnknownCheckpointId(checkpointId)) {
+            return;
+        }
+
+        // The coordinator holder may receive an acknowledgement event after 
the checkpoint
+        // corresponding to the event has been aborted, or even after a new 
checkpoint has started.
+        // The acknowledgement event should be ignored in these cases.
+        if (checkpointId < latestAttemptedCheckpointId) {
+            return;
         }
+
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get());
+        }
+    }
+
+    private boolean tryLogUnknownCheckpointId(long checkpointId) {
+        // This case might happen when the coordinator has been reset to a 
previous checkpoint, and

Review Comment:
   Remove `, and warn messages should be recorded` for simplicity.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -127,6 +130,15 @@
      */
     private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap;
 
+    /**
+     * A map that manages a completable future for each subtask. It helps to 
guarantee that when the

Review Comment:
   The sentence `It helps to guarantee that when ... until the checkpoint is 
completed or aborted` seems unrelated to the semantics of this variable. It 
seems simpler to remove this part.
   
   And it seems useful to mention the semantics of this map w.r.t. 
`currentPendingCheckpointId`. After all, the future is completed successfully 
iff the current execution of the given subtask has successfully completed the 
checkpoint whose id = `currentPendingCheckpointId`.
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -83,41 +118,98 @@ public void registerEventHandler(OperatorID operator, 
OperatorEventHandler handl
         }
     }
 
-    Set<OperatorID> getRegisteredOperators() {
-        return handlers.keySet();
+    boolean containsOperatorEventGateway(OperatorID operatorId) {

Review Comment:
   Would it be simpler to remove this method and put the check the above three 
methods?
   
   We can rename these methods as either `maybeXXX` or `XXXIfExists`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -75,6 +92,24 @@ void dispatchEventToHandlers(
         }
     }
 
+    void initializeOperatorEventGatewayState(

Review Comment:
   initializeOperatorEventGatewayState -> initializeOperatorEventGateway



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -75,6 +92,24 @@ void dispatchEventToHandlers(
         }
     }
 
+    void initializeOperatorEventGatewayState(
+            OperatorID operator, OperatorStateStore operatorStateStore) throws 
Exception {
+        getOperatorEventGateway(operator).initializeState(operatorStateStore);
+    }
+
+    void snapshotOperatorEventGatewayState(
+            OperatorID operator, OperatorStateStore operatorStateStore) throws 
Exception {
+        getOperatorEventGateway(operator).snapshotState(operatorStateStore);
+    }
+
+    void notifyOperatorSnapshotStateCompleted(

Review Comment:
   notifyOperatorSnapshotStateCompleted -> notifyOperatorSnapshotCompleted



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -220,12 +228,11 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) {
 
         // Gateways should always be marked and closed for a specific 
checkpoint before it can be
         // reopened for that checkpoint. If a gateway is to be opened for an 
unforeseen checkpoint,
-        // exceptions should be thrown.
+        // which might happen when the coordinator has been reset to a 
previous checkpoint, warn
+        // messages should be recorded.
         if (lastCheckpointId < checkpointId) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Gateway closed for different checkpoint: closed 
for = %d, expected = %d",
-                            currentCheckpointId, checkpointId));
+            LOG.warn("Trying to open gateway for unknown checkpoint: " + 
checkpointId);

Review Comment:
   Can you explain when we can have `lastCheckpointId < checkpointId`?
   
   Why this was not a problem before we update operator coordinator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to