lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r939473333


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -135,6 +147,8 @@
     private GlobalFailureHandler globalFailureHandler;
     private ComponentMainThreadExecutor mainThreadExecutor;
 
+    private long latestAttemptedCheckpointId;

Review Comment:
   Should we explicitly initialize this variable?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -127,6 +130,15 @@
      */
     private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap;
 
+    /**
+     * A map that manages a completable future for each subtask. It helps to 
guarantee that when the
+     * coordinator starts doing a checkpoint, it will not receive events from 
its subtasks anymore,
+     * until the checkpoint is completed or aborted. This map is only read or 
modified in
+     * checkpoint-related processes, and note that concurrent execution 
attempt is currently
+     * guaranteed to be disabled when checkpoint is enabled.
+     */
+    private final Map<Integer, CompletableFuture<Acknowledge>> 
acknowledgeCloseGatewayFutureMap;

Review Comment:
   According to the semantics of this map, do we need to update this map 
whenever subtask execution is updated (e.g. in `subtaskReset(...)`) to make 
sure that this map's value is always consistent with its semantics?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
         coordinator.resetToCheckpoint(checkpointId, checkpointData);
     }
 
+    private void closeGatewayAndCheckpointCoordinator(
+            long checkpointId, CompletableFuture<byte[]> result) {
+        mainThreadExecutor.assertRunningInMainThread();
+
+        latestAttemptedCheckpointId = checkpointId;
+
+        acknowledgeCloseGatewayFutureMap.clear();

Review Comment:
   At the time `closeGatewayAndCheckpointCoordinator()` is invoked, is it 
possible that `acknowledgeCloseGatewayFutureMap()` is not empty?
   
   If it must be empty, then it seems better to verify this in the method 
instead of clearing it.
   
   If it can be non-empty, then it might be possible to have future objects 
never be completed (and thus leaked) because we clear the map without 
completing these objects. Can this be a problem?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -221,7 +223,7 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) {
         // Gateways should always be marked and closed for a specific 
checkpoint before it can be
         // reopened for that checkpoint. If a gateway is to be opened for an 
unforeseen checkpoint,
         // exceptions should be thrown.
-        if (lastCheckpointId < checkpointId) {
+        if (lastCheckpointId != NO_CHECKPOINT && lastCheckpointId < 
checkpointId) {

Review Comment:
   When is it possible to have this method be called with `lastCheckpointId == 
NO_CHECKPOINT`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java:
##########
@@ -62,8 +63,9 @@ public WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
getMainOperatorOutput() {
     }
 
     @Override
-    public void dispatchOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEvent> event) {
-        throw new UnsupportedOperationException();
+    public void dispatchOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEvent> event)

Review Comment:
   It seems weird that a `FinishedOperatorChain` now needs to support event 
dispatch. Conceptually a finished operator chain (or task) should not have to 
send/receive anything.
   
   Prior to this PR, does `FinishedOperatorChain` need to send/receive any 
event? If no, is there anyway to keep the same behavior?
   
   If we have to change the behavior in order to support 
`CloseGatewayEvent/AcknowledgeCloseGatewayEvent`, can you explain the main 
difference between these two messages and all existing control flow messages 
that make this change necessary?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##########
@@ -1488,12 +1488,15 @@ public void deliverOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEv
             } catch (Throwable t) {
                 ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
-                if (getExecutionState() == ExecutionState.RUNNING
-                        || getExecutionState() == ExecutionState.INITIALIZING) 
{
+                if ((getExecutionState() == ExecutionState.RUNNING
+                                || getExecutionState() == 
ExecutionState.INITIALIZING)
+                        && !(t instanceof RejectedExecutionException)) {

Review Comment:
   Can you explain why this PR needs to make this change?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -240,7 +261,8 @@ public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> r
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> 
checkpointCoordinatorInternal(checkpointId, result));
+        mainThreadExecutor.execute(
+                () -> closeGatewayAndCheckpointCoordinator(checkpointId, 
result));

Review Comment:
   `closeGateWay` is conceptually part of the `CheckpointCoordinator` process, 
right? Why do we need to explicitly mention this in the method name?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -251,6 +273,7 @@ public void notifyCheckpointComplete(long checkpointId) {
         // scheduler's main thread executor
         mainThreadExecutor.execute(
                 () -> {
+                    acknowledgeCloseGatewayFutureMap.clear();

Review Comment:
   Suppose the current checkpoint is indeed completed, would it be more 
intuitive and readable to verify that 
`acknowledgeCloseGatewayFutureMap.isEmpty()` instead of clearing this map?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/AcknowledgeCheckpointEvent.java:
##########
@@ -34,4 +34,23 @@ public AcknowledgeCheckpointEvent(long checkpointId) {
     long getCheckpointID() {
         return checkpointId;
     }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(checkpointId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {

Review Comment:
   Where is this `equals()` needed? Should we additionally use `subtaskId` to 
uniquely identify a `AcknowledgeCheckpointEvent`?
   
   Same for `AcknowledgeCloseGatewayEvent`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -266,6 +289,7 @@ public void notifyCheckpointAborted(long checkpointId) {
         // scheduler's main thread executor
         mainThreadExecutor.execute(
                 () -> {
+                    abortAllPendingAcknowledgeCloseGatewayFutures();

Review Comment:
   Is it possible that `notifyCheckpointAborted(checkpointId=10)` is triggered 
when the operator coordinator is working on checkpointId=11?
   
   If yes, it seems incorrect to invoke 
`abortAllPendingAcknowledgeCloseGatewayFutures()` for the ongoing checkpoint 
without checking whether checkpointId == ongoingCheckpiontId.
   
   Same for `notifyCheckpointComplete(...)`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -83,41 +130,101 @@ public void registerEventHandler(OperatorID operator, 
OperatorEventHandler handl
         }
     }
 
-    Set<OperatorID> getRegisteredOperators() {
-        return handlers.keySet();
+    boolean isRegisteredOperator(OperatorID operatorId) {
+        return handlers.containsKey(operatorId) || 
gatewayMap.containsKey(operatorId);

Review Comment:
   Can you explain the case where an `operatorId` is in `gatewayMap` but not in 
`handlers`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
         coordinator.resetToCheckpoint(checkpointId, checkpointData);
     }
 
+    private void closeGatewayAndCheckpointCoordinator(
+            long checkpointId, CompletableFuture<byte[]> result) {
+        mainThreadExecutor.assertRunningInMainThread();
+
+        latestAttemptedCheckpointId = checkpointId;
+
+        acknowledgeCloseGatewayFutureMap.clear();
+        for (int subtask : subtaskGatewayMap.keySet()) {
+            acknowledgeCloseGatewayFutureMap.put(subtask, new 
CompletableFuture<>());
+            subtaskGatewayMap
+                    .get(subtask)
+                    .sendEventWithCallBackOnCompletion(
+                            new CloseGatewayEvent(checkpointId),
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    // The close gateway event failed to reach 
the subtask for some
+                                    // reason. For example, the subtask has 
finished. In this case
+                                    // it is guaranteed that the coordinator 
won't receive more
+                                    // events from this subtask before the 
current checkpoint
+                                    // finishes, which is equivalent to 
receiving ACK from this
+                                    // subtask.
+                                    
completeAcknowledgeCloseGatewayFuture(subtask, checkpointId);
+
+                                    if (!(failure instanceof 
RejectedExecutionException

Review Comment:
   `SubtaskGatewayImpl::sendEvent()` will invoke `tryTriggerTaskFailover()` 
regardless of the exception type. 
   
   Can you explain why we need to check the exception type here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -298,6 +322,86 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
         coordinator.resetToCheckpoint(checkpointId, checkpointData);
     }
 
+    private void closeGatewayAndCheckpointCoordinator(
+            long checkpointId, CompletableFuture<byte[]> result) {
+        mainThreadExecutor.assertRunningInMainThread();
+
+        latestAttemptedCheckpointId = checkpointId;
+
+        acknowledgeCloseGatewayFutureMap.clear();
+        for (int subtask : subtaskGatewayMap.keySet()) {
+            acknowledgeCloseGatewayFutureMap.put(subtask, new 
CompletableFuture<>());
+            subtaskGatewayMap
+                    .get(subtask)
+                    .sendEventWithCallBackOnCompletion(
+                            new CloseGatewayEvent(checkpointId),
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    // The close gateway event failed to reach 
the subtask for some
+                                    // reason. For example, the subtask has 
finished. In this case
+                                    // it is guaranteed that the coordinator 
won't receive more
+                                    // events from this subtask before the 
current checkpoint
+                                    // finishes, which is equivalent to 
receiving ACK from this
+                                    // subtask.
+                                    
completeAcknowledgeCloseGatewayFuture(subtask, checkpointId);

Review Comment:
   Since this method relies on the assumption that `coordinator will won't 
receive more events from this subtask before the current checkpoint finishes`, 
and this assumption relies on `tryTriggerTaskFailover()`, to avoid any 
potential race condition, would it be safer to invoke 
`tryTriggerTaskFailover()` before invoking 
`completeAcknowledgeCloseGatewayFuture()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to