lindong28 commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r944072971
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { + acknowledgeCloseGatewayFutureMap + .values() + .forEach(x -> x.completeExceptionally(new FlinkException(message))); + acknowledgeCloseGatewayFutureMap.clear(); + } + + private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally( Review Comment: This name is inconsistent with `abortAllPendingAcknowledgeCloseGatewayFuture(...)`. How about `abortAcknowledgeCloseGatewayFuture(...)`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { + acknowledgeCloseGatewayFutureMap + .values() + .forEach(x -> x.completeExceptionally(new FlinkException(message))); + acknowledgeCloseGatewayFutureMap.clear(); + } + + private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally( + int subtask, String message, @Nullable Throwable reason) { + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + Exception exception = new FlinkException(message, reason); + acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception); + } + } + + private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, long checkpointId) { Review Comment: It seems simpler to name it `completeAcknowledgeCloseGatewayFuture()` for the same reason mentioned above. Whether this entry is kept in the map is an implementation detail. And it makes no sense to keep a completed future in the map. ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -151,6 +166,10 @@ private OperatorCoordinatorHolder( this.operatorMaxParallelism = operatorMaxParallelism; this.subtaskGatewayMap = new HashMap<>(); + this.acknowledgeCloseGatewayFutureMap = new HashMap<>(); + this.currentPendingCheckpointId = NO_CHECKPOINT; Review Comment: It seems better to use `this.currentPendingCheckpointId = OperatorCoordinator.NO_CHECKPOINT` directly to be consistent with existing code that reads `NO_CHECKPOINT`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -299,9 +364,78 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData } private void checkpointCoordinatorInternal( - final long checkpointId, final CompletableFuture<byte[]> result) { + long checkpointId, CompletableFuture<byte[]> result) { mainThreadExecutor.assertRunningInMainThread(); + try { + if (currentPendingCheckpointId != NO_CHECKPOINT + && currentPendingCheckpointId != checkpointId) { + throw new IllegalStateException( + String.format( + "Cannot checkpoint coordinator for checkpoint %d, " + + "since checkpoint %d has already started.", + checkpointId, currentPendingCheckpointId)); + } + + if (latestAttemptedCheckpointId >= checkpointId) { + throw new IllegalStateException( + String.format( + "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", + latestAttemptedCheckpointId, checkpointId)); + } + + subtaskGatewayMap.forEach( + (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); + + Preconditions.checkState(acknowledgeCloseGatewayFutureMap.isEmpty()); + } catch (Throwable t) { + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + return; + } + + currentPendingCheckpointId = checkpointId; + latestAttemptedCheckpointId = checkpointId; + + for (int subtask : subtaskGatewayMap.keySet()) { + acknowledgeCloseGatewayFutureMap.put(subtask, new CompletableFuture<>()); + final OperatorEvent closeGatewayEvent = new CloseGatewayEvent(checkpointId, subtask); + subtaskGatewayMap + .get(subtask) + .sendEventWithCallBackOnCompletion( + closeGatewayEvent, + (success, failure) -> { + if (failure != null) { + // If the close gateway event failed to reach the subtask for + // some reason, the coordinator would trigger a fail-over on + // the subtask if the subtask is still running. This behavior + // also guarantees that the coordinator won't receive more + // events from this subtask before the current checkpoint + // finishes, which is equivalent to receiving ACK from this + // subtask. + if (!(failure instanceof TaskNotRunningException)) { + boolean isFailoverTriggered = + subtaskGatewayMap + .get(subtask) + .tryTriggerTaskFailover( Review Comment: Do we need to preserve `Runnables.assertNoException(...)` here? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -299,9 +364,78 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData } private void checkpointCoordinatorInternal( - final long checkpointId, final CompletableFuture<byte[]> result) { + long checkpointId, CompletableFuture<byte[]> result) { mainThreadExecutor.assertRunningInMainThread(); + try { + if (currentPendingCheckpointId != NO_CHECKPOINT + && currentPendingCheckpointId != checkpointId) { + throw new IllegalStateException( + String.format( + "Cannot checkpoint coordinator for checkpoint %d, " + + "since checkpoint %d has already started.", + checkpointId, currentPendingCheckpointId)); + } + + if (latestAttemptedCheckpointId >= checkpointId) { + throw new IllegalStateException( + String.format( + "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", + latestAttemptedCheckpointId, checkpointId)); + } + + subtaskGatewayMap.forEach( + (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); + + Preconditions.checkState(acknowledgeCloseGatewayFutureMap.isEmpty()); + } catch (Throwable t) { + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + return; + } + + currentPendingCheckpointId = checkpointId; + latestAttemptedCheckpointId = checkpointId; + + for (int subtask : subtaskGatewayMap.keySet()) { + acknowledgeCloseGatewayFutureMap.put(subtask, new CompletableFuture<>()); + final OperatorEvent closeGatewayEvent = new CloseGatewayEvent(checkpointId, subtask); + subtaskGatewayMap + .get(subtask) + .sendEventWithCallBackOnCompletion( + closeGatewayEvent, + (success, failure) -> { + if (failure != null) { + // If the close gateway event failed to reach the subtask for + // some reason, the coordinator would trigger a fail-over on + // the subtask if the subtask is still running. This behavior + // also guarantees that the coordinator won't receive more + // events from this subtask before the current checkpoint + // finishes, which is equivalent to receiving ACK from this + // subtask. + if (!(failure instanceof TaskNotRunningException)) { + boolean isFailoverTriggered = + subtaskGatewayMap + .get(subtask) + .tryTriggerTaskFailover( + closeGatewayEvent, failure); + if (isFailoverTriggered) { Review Comment: Prior to this PR, the `result` is completed with exception if `failure != null`. This behavior seems pretty intuitive. Can you explain why we should complete `result` successfully if `isFailoverTriggered == false`? Would it be simpler to remove this check? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -220,12 +228,11 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) { // Gateways should always be marked and closed for a specific checkpoint before it can be // reopened for that checkpoint. If a gateway is to be opened for an unforeseen checkpoint, - // exceptions should be thrown. + // which might happen when the coordinator has been reset to a previous checkpoint, warn + // messages should be recorded. if (lastCheckpointId < checkpointId) { - throw new IllegalStateException( - String.format( - "Gateway closed for different checkpoint: closed for = %d, expected = %d", - currentCheckpointId, checkpointId)); + LOG.warn("Trying to open gateway for unknown checkpoint: " + checkpointId); Review Comment: Can you explain when we can have `lastCheckpointId < checkpointId`? Why this was not a problem before we update operator coordinator? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -75,6 +92,24 @@ void dispatchEventToHandlers( } } + void initializeOperatorEventGatewayState( + OperatorID operator, OperatorStateStore operatorStateStore) throws Exception { + getOperatorEventGateway(operator).initializeState(operatorStateStore); + } + + void snapshotOperatorEventGatewayState( Review Comment: snapshotOperatorEventGatewayState -> snapshotOperatorEventGateway ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { + acknowledgeCloseGatewayFutureMap + .values() + .forEach(x -> x.completeExceptionally(new FlinkException(message))); + acknowledgeCloseGatewayFutureMap.clear(); + } + + private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally( + int subtask, String message, @Nullable Throwable reason) { + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + Exception exception = new FlinkException(message, reason); + acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception); + } + } + + private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, long checkpointId) { + if (tryLogUnknownCheckpointId(checkpointId)) { + return; + } + + // The coordinator holder may receive an acknowledgement event after the checkpoint + // corresponding to the event has been aborted, or even after a new checkpoint has started. + // The acknowledgement event should be ignored in these cases. + if (checkpointId < latestAttemptedCheckpointId) { + return; } + + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get()); + } + } + + private boolean tryLogUnknownCheckpointId(long checkpointId) { Review Comment: `tryLogUnknownCheckpointId` -> `IsUnknownCheckpointId`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { Review Comment: How about `abortAcknowledgeCloseGatewayFutures()`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -312,26 +446,81 @@ private void checkpointCoordinatorInternal( } else if (closeGateways(checkpointId)) { completeCheckpointOnceEventsAreDone(checkpointId, result, success); } else { - // if we cannot close the gateway, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. + // if we cannot close the gateway, this means the checkpoint has + // been aborted before, so the future is already completed + // exceptionally. but we try to complete it here again, just in + // case, as a safety net. result.completeExceptionally( new FlinkException("Cannot close gateway")); } return null; }, mainThreadExecutor)); - try { - subtaskGatewayMap.forEach( - (subtask, gateway) -> gateway.markForCheckpoint(checkpointId)); - coordinator.checkpointCoordinator(checkpointId, coordinatorCheckpoint); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - result.completeExceptionally(t); - globalFailureHandler.handleGlobalFailure(t); + FutureUtils.combineAll(acknowledgeCloseGatewayFutureMap.values()) + .handleAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else { + try { + coordinator.checkpointCoordinator( + checkpointId, coordinatorCheckpoint); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + result.completeExceptionally(t); + globalFailureHandler.handleGlobalFailure(t); + } + } + return null; + }, + mainThreadExecutor); + } + + private void abortAllPendingAcknowledgeCloseGatewayFutures(String message) { + acknowledgeCloseGatewayFutureMap + .values() + .forEach(x -> x.completeExceptionally(new FlinkException(message))); + acknowledgeCloseGatewayFutureMap.clear(); + } + + private void completeAndRemoveAcknowledgeCloseGatewayFutureExceptionally( + int subtask, String message, @Nullable Throwable reason) { + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + Exception exception = new FlinkException(message, reason); + acknowledgeCloseGatewayFutureMap.remove(subtask).completeExceptionally(exception); + } + } + + private void completeAndRemoveAcknowledgeCloseGatewayFuture(int subtask, long checkpointId) { + if (tryLogUnknownCheckpointId(checkpointId)) { + return; + } + + // The coordinator holder may receive an acknowledgement event after the checkpoint + // corresponding to the event has been aborted, or even after a new checkpoint has started. + // The acknowledgement event should be ignored in these cases. + if (checkpointId < latestAttemptedCheckpointId) { + return; } + + if (acknowledgeCloseGatewayFutureMap.containsKey(subtask)) { + acknowledgeCloseGatewayFutureMap.remove(subtask).complete(Acknowledge.get()); + } + } + + private boolean tryLogUnknownCheckpointId(long checkpointId) { + // This case might happen when the coordinator has been reset to a previous checkpoint, and Review Comment: Remove `, and warn messages should be recorded` for simplicity. ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -127,6 +130,15 @@ */ private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap; + /** + * A map that manages a completable future for each subtask. It helps to guarantee that when the Review Comment: The sentence `It helps to guarantee that when ... until the checkpoint is completed or aborted` seems unrelated to the semantics of this variable. It seems simpler to remove this part. And it seems useful to mention the semantics of this map w.r.t. `currentPendingCheckpointId`. After all, the future is completed successfully iff the current execution of the given subtask has successfully completed the checkpoint whose id = `currentPendingCheckpointId`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -83,41 +118,98 @@ public void registerEventHandler(OperatorID operator, OperatorEventHandler handl } } - Set<OperatorID> getRegisteredOperators() { - return handlers.keySet(); + boolean containsOperatorEventGateway(OperatorID operatorId) { Review Comment: Would it be simpler to remove this method and put the check the above three methods? We can rename these methods as either `maybeXXX` or `XXXIfExists`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -75,6 +92,24 @@ void dispatchEventToHandlers( } } + void initializeOperatorEventGatewayState( Review Comment: initializeOperatorEventGatewayState -> initializeOperatorEventGateway ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -75,6 +92,24 @@ void dispatchEventToHandlers( } } + void initializeOperatorEventGatewayState( + OperatorID operator, OperatorStateStore operatorStateStore) throws Exception { + getOperatorEventGateway(operator).initializeState(operatorStateStore); + } + + void snapshotOperatorEventGatewayState( + OperatorID operator, OperatorStateStore operatorStateStore) throws Exception { + getOperatorEventGateway(operator).snapshotState(operatorStateStore); + } + + void notifyOperatorSnapshotStateCompleted( Review Comment: notifyOperatorSnapshotStateCompleted -> notifyOperatorSnapshotCompleted ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -220,12 +228,11 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) { // Gateways should always be marked and closed for a specific checkpoint before it can be // reopened for that checkpoint. If a gateway is to be opened for an unforeseen checkpoint, - // exceptions should be thrown. + // which might happen when the coordinator has been reset to a previous checkpoint, warn + // messages should be recorded. if (lastCheckpointId < checkpointId) { - throw new IllegalStateException( - String.format( - "Gateway closed for different checkpoint: closed for = %d, expected = %d", - currentCheckpointId, checkpointId)); + LOG.warn("Trying to open gateway for unknown checkpoint: " + checkpointId); Review Comment: Can you explain when we can have `lastCheckpointId < checkpointId`? Why this was not a problem before we update operator coordinator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org