lindong28 commented on code in PR #20752: URL: https://github.com/apache/flink/pull/20752#discussion_r962646192
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -78,10 +80,9 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { this.subtaskAccess = subtaskAccess; this.mainThreadExecutor = mainThreadExecutor; this.incompleteFuturesTracker = incompleteFuturesTracker; - this.blockedEvents = new ArrayList<>(); - this.currentCheckpointId = NO_CHECKPOINT; - this.lastCheckpointId = Long.MIN_VALUE; - this.isClosed = false; + this.blockedEventsMap = new TreeMap<>(); + this.latestActiveCheckpointId = NO_CHECKPOINT; + this.latestAttemptedCheckpointId = Long.MIN_VALUE; Review Comment: Would it be better to replace `Long.MIN_VALUE` with `NO_CHECKPOINT`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -63,13 +67,11 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { private final IncompleteFuturesTracker incompleteFuturesTracker; - private final List<BlockedEvent> blockedEvents; + private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap; - private long currentCheckpointId; + private long latestActiveCheckpointId; Review Comment: Would it be simpler to remove this variable and derive its value from `blockedEventsMap` when needed? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -221,38 +215,57 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) { // Gateways should always be marked and closed for a specific checkpoint before it can be // reopened for that checkpoint. If a gateway is to be opened for an unforeseen checkpoint, // exceptions should be thrown. - if (lastCheckpointId < checkpointId) { + if (latestAttemptedCheckpointId < checkpointId) { throw new IllegalStateException( String.format( "Gateway closed for different checkpoint: closed for = %d, expected = %d", - currentCheckpointId, checkpointId)); + latestActiveCheckpointId, checkpointId)); } // The message to open gateway with a specific checkpoint id might arrive after the // checkpoint has been aborted, or even after a new checkpoint has started. In these cases // this message should be ignored. - if (currentCheckpointId == NO_CHECKPOINT || checkpointId < lastCheckpointId) { + if (latestActiveCheckpointId == NO_CHECKPOINT Review Comment: Would it be simpler to remove `latestActiveCheckpointId == NO_CHECKPOINT` from this boolean expression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org