lindong28 commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r939473333
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -135,6 +147,8 @@ private GlobalFailureHandler globalFailureHandler; private ComponentMainThreadExecutor mainThreadExecutor; + private long latestAttemptedCheckpointId; Review Comment: Should we explicitly initialize this variable? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -127,6 +130,15 @@ */ private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap; + /** + * A map that manages a completable future for each subtask. It helps to guarantee that when the + * coordinator starts doing a checkpoint, it will not receive events from its subtasks anymore, + * until the checkpoint is completed or aborted. This map is only read or modified in + * checkpoint-related processes, and note that concurrent execution attempt is currently + * guaranteed to be disabled when checkpoint is enabled. + */ + private final Map<Integer, CompletableFuture<Acknowledge>> acknowledgeCloseGatewayFutureMap; Review Comment: According to the semantics of this map, do we need to update this map whenever subtask execution is updated (e.g. in `subtaskReset(...)`) to make sure that this map's value is always consistent with its semantics? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData coordinator.resetToCheckpoint(checkpointId, checkpointData); } + private void closeGatewayAndCheckpointCoordinator( + long checkpointId, CompletableFuture<byte[]> result) { + mainThreadExecutor.assertRunningInMainThread(); + + latestAttemptedCheckpointId = checkpointId; + + acknowledgeCloseGatewayFutureMap.clear(); Review Comment: At the time `closeGatewayAndCheckpointCoordinator()` is invoked, is it possible that `acknowledgeCloseGatewayFutureMap()` is not empty? If it must be empty, then it seems better to verify this in the method instead of clearing it. If it can be non-empty, then it might be possible to have future objects never be completed (and thus leaked) because we clear the map without completing these objects. Can this be a problem? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -221,7 +223,7 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) { // Gateways should always be marked and closed for a specific checkpoint before it can be // reopened for that checkpoint. If a gateway is to be opened for an unforeseen checkpoint, // exceptions should be thrown. - if (lastCheckpointId < checkpointId) { + if (lastCheckpointId != NO_CHECKPOINT && lastCheckpointId < checkpointId) { Review Comment: When is it possible to have this method be called with `lastCheckpointId == NO_CHECKPOINT`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java: ########## @@ -62,8 +63,9 @@ public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput() { } @Override - public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) { - throw new UnsupportedOperationException(); + public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) Review Comment: It seems weird that a `FinishedOperatorChain` now needs to support event dispatch. Conceptually a finished operator chain (or task) should not have to send/receive anything. Prior to this PR, does `FinishedOperatorChain` need to send/receive any event? If no, is there anyway to keep the same behavior? If we have to change the behavior in order to support `CloseGatewayEvent/AcknowledgeCloseGatewayEvent`, can you explain the main difference between these two messages and all existing control flow messages that make this change necessary? ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ########## @@ -1488,12 +1488,15 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEv } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - if (getExecutionState() == ExecutionState.RUNNING - || getExecutionState() == ExecutionState.INITIALIZING) { + if ((getExecutionState() == ExecutionState.RUNNING + || getExecutionState() == ExecutionState.INITIALIZING) + && !(t instanceof RejectedExecutionException)) { Review Comment: Can you explain why this PR needs to make this change? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -240,7 +261,8 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r // checkpoint coordinator time thread. // we can remove the delegation once the checkpoint coordinator runs fully in the // scheduler's main thread executor - mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, result)); + mainThreadExecutor.execute( + () -> closeGatewayAndCheckpointCoordinator(checkpointId, result)); Review Comment: `closeGateWay` is conceptually part of the `CheckpointCoordinator` process, right? Why do we need to explicitly mention this in the method name? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -251,6 +273,7 @@ public void notifyCheckpointComplete(long checkpointId) { // scheduler's main thread executor mainThreadExecutor.execute( () -> { + acknowledgeCloseGatewayFutureMap.clear(); Review Comment: Suppose the current checkpoint is indeed completed, would it be more intuitive and readable to verify that `acknowledgeCloseGatewayFutureMap.isEmpty()` instead of clearing this map? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java: ########## @@ -34,4 +34,23 @@ public AcknowledgeCheckpointEvent(long checkpointId) { long getCheckpointID() { return checkpointId; } + + @Override + public int hashCode() { + return Long.hashCode(checkpointId); + } + + @Override + public boolean equals(Object obj) { Review Comment: Where is this `equals()` needed? Should we additionally use `subtaskId` to uniquely identify a `AcknowledgeCheckpointEvent`? Same for `AcknowledgeCloseGatewayEvent`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -266,6 +289,7 @@ public void notifyCheckpointAborted(long checkpointId) { // scheduler's main thread executor mainThreadExecutor.execute( () -> { + abortAllPendingAcknowledgeCloseGatewayFutures(); Review Comment: Is it possible that `notifyCheckpointAborted(checkpointId=10)` is triggered when the operator coordinator is working on checkpointId=11? If yes, it seems incorrect to invoke `abortAllPendingAcknowledgeCloseGatewayFutures()` for the ongoing checkpoint without checking whether checkpointId == ongoingCheckpiontId. Same for `notifyCheckpointComplete(...)`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ########## @@ -83,41 +130,101 @@ public void registerEventHandler(OperatorID operator, OperatorEventHandler handl } } - Set<OperatorID> getRegisteredOperators() { - return handlers.keySet(); + boolean isRegisteredOperator(OperatorID operatorId) { + return handlers.containsKey(operatorId) || gatewayMap.containsKey(operatorId); Review Comment: Can you explain the case where an `operatorId` is in `gatewayMap` but not in `handlers`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData coordinator.resetToCheckpoint(checkpointId, checkpointData); } + private void closeGatewayAndCheckpointCoordinator( + long checkpointId, CompletableFuture<byte[]> result) { + mainThreadExecutor.assertRunningInMainThread(); + + latestAttemptedCheckpointId = checkpointId; + + acknowledgeCloseGatewayFutureMap.clear(); + for (int subtask : subtaskGatewayMap.keySet()) { + acknowledgeCloseGatewayFutureMap.put(subtask, new CompletableFuture<>()); + subtaskGatewayMap + .get(subtask) + .sendEventWithCallBackOnCompletion( + new CloseGatewayEvent(checkpointId), + (success, failure) -> { + if (failure != null) { + // The close gateway event failed to reach the subtask for some + // reason. For example, the subtask has finished. In this case + // it is guaranteed that the coordinator won't receive more + // events from this subtask before the current checkpoint + // finishes, which is equivalent to receiving ACK from this + // subtask. + completeAcknowledgeCloseGatewayFuture(subtask, checkpointId); + + if (!(failure instanceof RejectedExecutionException Review Comment: `SubtaskGatewayImpl::sendEvent()` will invoke `tryTriggerTaskFailover()` regardless of the exception type. Can you explain why we need to check the exception type here? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData coordinator.resetToCheckpoint(checkpointId, checkpointData); } + private void closeGatewayAndCheckpointCoordinator( + long checkpointId, CompletableFuture<byte[]> result) { + mainThreadExecutor.assertRunningInMainThread(); + + latestAttemptedCheckpointId = checkpointId; + + acknowledgeCloseGatewayFutureMap.clear(); + for (int subtask : subtaskGatewayMap.keySet()) { + acknowledgeCloseGatewayFutureMap.put(subtask, new CompletableFuture<>()); + subtaskGatewayMap + .get(subtask) + .sendEventWithCallBackOnCompletion( + new CloseGatewayEvent(checkpointId), + (success, failure) -> { + if (failure != null) { + // The close gateway event failed to reach the subtask for some + // reason. For example, the subtask has finished. In this case + // it is guaranteed that the coordinator won't receive more + // events from this subtask before the current checkpoint + // finishes, which is equivalent to receiving ACK from this + // subtask. + completeAcknowledgeCloseGatewayFuture(subtask, checkpointId); Review Comment: Since this method relies on the assumption that `coordinator will won't receive more events from this subtask before the current checkpoint finishes`, and this assumption relies on `tryTriggerTaskFailover()`, to avoid any potential race condition, would it be safer to invoke `tryTriggerTaskFailover()` before invoking `completeAcknowledgeCloseGatewayFuture()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org