StephanEwen commented on pull request #12234: URL: https://github.com/apache/flink/pull/12234#issuecomment-636172362
Updated the PR to address the issue raised by @becketqin This required a change in the `OperatorCoordinator` interface. To explain why, let's revisit the semantics we want to offer. The exactly-once semantics for the `OperatorCoordinator` are defined as follows: - The point in time when the checkpoint future is completed is considered the point in time when the coordinator's checkpoint takes place. - The OperatorCoordinator implementation must have a way of strictly ordering the sending of events and the completion of the checkpoint future (for example the same thread does both actions, or both actions are guarded by a mutex). - Every event sent before the checkpoint future is completed is considered before the checkpoint. - Every event sent after the checkpoint future is completed is considered to be after the checkpoint. The previous interface did not allow us to observe this point accurately. The future was created inside the application-specific OperatorCoordinator code and returned from the methods. By the time that the scheduler/checkpointing code could observe the future (attach handlers to it), some (small amount of) time had inevitably passed in the meantime. Within that time, the future could already be complete and some events could have been sent, and in that case the scheduler/checkpointing code could not determin which events were before the completion of the future, and which events were after the completion of the future. We hence need to change the checkpointing method from ```java CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception; ``` to ```java void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; ``` The changed interface passes the future from the scheduler/checkpointing code into the coordinator. The future already has synchronous handlers attached to it which exactly mark the point when the future was completed, allowing the scheduler/checkpointing code to observe the correct order in which the Checkpoint Coordinator implementation performed its actions (event sending, future completion). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org