lindong28 commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r941388114
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,25 +423,97 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { + if (acknowledgeCloseGatewayFutureMap.isEmpty()) { + return; + } + + for (int subtask : acknowledgeCloseGatewayFutureMap.keySet()) { + completeAcknowledgeCloseGatewayFutureExceptionally(subtask, message, null); + } + } + + private void completeAcknowledgeCloseGatewayFutureExceptionally( + int subtask, String message, @Nullable Throwable reason) { + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + Exception exception = new FlinkException(message, reason); + acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception); + } + } + + private void completeAcknowledgeCloseGatewayFuture(int subtask, long checkpointId) { + // The coordinator holder may receive an acknowledgement event after the checkpoint + // corresponding to the event has been aborted, or even after a new checkpoint has started. + // The acknowledgement event should be ignored in these cases. + if (!isCurrentPendingCheckpoint(checkpointId)) { + return; + } + + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get()); + } + } + + /** + * Checks whether a provided checkpoint id corresponds to the current pending checkpoint. + * + * @return true if the provided id corresponds to the current checkpoint, false if the id + * corresponds to a previous checkpoint, or there is no pending checkpoint currently. + * @throws IllegalArgumentException if the coordinator holder has never a checkpoint with the + * provided id. + */ + private boolean isCurrentPendingCheckpoint(long checkpointId) { + if (checkpointId > latestAttemptedCheckpointId) { + throw new IllegalArgumentException( + "The provided checkpoint id " + + checkpointId + + " is related to a newer checkpoint that is unknown to the coordinator holder."); + } + + if (currentPendingCheckpointId == NO_CHECKPOINT) { + return false; + } else { + if (latestAttemptedCheckpointId != currentPendingCheckpointId) { Review Comment: Neither `latestAttemptedCheckpointId` nor `currentPendingCheckpointId` is updated in this method. Do we need to make this check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org