lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r941388114


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -312,25 +423,97 @@ private void checkpointCoordinatorInternal(
                             } else if (closeGateways(checkpointId)) {
                                 
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot close the gateway, this means 
the checkpoint
-                                // has been aborted before, so the future is 
already
-                                // completed exceptionally. but we try to 
complete it here
-                                // again, just in case, as a safety net.
+                                // if we cannot close the gateway, this means 
the checkpoint has
+                                // been aborted before, so the future is 
already completed
+                                // exceptionally. but we try to complete it 
here again, just in
+                                // case, as a safety net.
                                 result.completeExceptionally(
                                         new FlinkException("Cannot close 
gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
-        try {
-            subtaskGatewayMap.forEach(
-                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
-            coordinator.checkpointCoordinator(checkpointId, 
coordinatorCheckpoint);
-        } catch (Throwable t) {
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            result.completeExceptionally(t);
-            globalFailureHandler.handleGlobalFailure(t);
+        FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values())
+                .handleAsync(
+                        (success, failure) -> {
+                            if (failure != null) {
+                                result.completeExceptionally(failure);
+                            } else {
+                                try {
+                                    coordinator.checkpointCoordinator(
+                                            checkpointId, 
coordinatorCheckpoint);
+                                } catch (Throwable t) {
+                                    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                    result.completeExceptionally(t);
+                                    
globalFailureHandler.handleGlobalFailure(t);
+                                }
+                            }
+                            return null;
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) 
{
+        if (acknowledgeCloseGatewayFutureMap.isEmpty()) {
+            return;
+        }
+
+        for (int subtask : acknowledgeCloseGatewayFutureMap.keySet()) {
+            completeAcknowledgeCloseGatewayFutureExceptionally(subtask, 
message, null);
+        }
+    }
+
+    private void completeAcknowledgeCloseGatewayFutureExceptionally(
+            int subtask, String message, @Nullable Throwable reason) {
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            Exception exception = new FlinkException(message, reason);
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception);
+        }
+    }
+
+    private void completeAcknowledgeCloseGatewayFuture(int subtask, long 
checkpointId) {
+        // The coordinator holder may receive an acknowledgement event after 
the checkpoint
+        // corresponding to the event has been aborted, or even after a new 
checkpoint has started.
+        // The acknowledgement event should be ignored in these cases.
+        if (!isCurrentPendingCheckpoint(checkpointId)) {
+            return;
+        }
+
+        if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) {
+            
acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get());
+        }
+    }
+
+    /**
+     * Checks whether a provided checkpoint id corresponds to the current 
pending checkpoint.
+     *
+     * @return true if the provided id corresponds to the current checkpoint, 
false if the id
+     *     corresponds to a previous checkpoint, or there is no pending 
checkpoint currently.
+     * @throws IllegalArgumentException if the coordinator holder has never a 
checkpoint with the
+     *     provided id.
+     */
+    private boolean isCurrentPendingCheckpoint(long checkpointId) {
+        if (checkpointId > latestAttemptedCheckpointId) {
+            throw new IllegalArgumentException(
+                    "The provided checkpoint id "
+                            + checkpointId
+                            + " is related to a newer checkpoint that is 
unknown to the coordinator holder.");
+        }
+
+        if (currentPendingCheckpointId == NO_CHECKPOINT) {
+            return false;
+        } else {
+            if (latestAttemptedCheckpointId != currentPendingCheckpointId) {

Review Comment:
   Neither `latestAttemptedCheckpointId` nor `currentPendingCheckpointId` is 
updated in this method. Do we need to make this check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to