StephanEwen opened a new pull request #12234: URL: https://github.com/apache/flink/pull/12234
## What is the purpose of the change Ensures that sending of `OperatorEvents` from `OperatorCoordinator` to `StreamOperator` follows exactly-once semantics. The checkpointing procedure is as follows: - `OperatorCoordinator` takes its checkpoint. All events it has sent before that should be received by the Operators before those take their checkpoint. - Barriers must only be injected to the sources when all Operator Coordinators are done with their checkpoints, so that they can be injected at the same time (otherwise sone sources get barriers earlier and we make checkpoint alignment worse). - Between one coordinator completing its checkpoint and the last one completing its checkpoints, events must not be transported. Consider this example: ``` Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z ``` Two coordinators trigger checkpoints at the same time. Coordinator two takes longer to complete, and in the meantime coordinator one sends more events. Coordinator one emits events `c` and `d` after it finished its checkpoint, meaning the events must take place after the checkpoint. But they are before the barrier injection, meaning the runtime task will see them before the checkpoint. This PR introduces the `OperatorEventValve` which is a "gateway" that events pass through, from the coordinator to the operator. When the coordinator completes its checkpoint, the valve is shut, and it is re-opened after the barriers are injected. That way, the events between a coordinator's checkpoint completion and the barrier injection are "moved" to after the barriers. **Failure Handling** - The valve opens immediately when the currently triggering checkpoint is aborted. - When a subtask fails, the events that queued for that task are cleared and reported back to teh coordinator as failed. **Threading Model Assumptions** - All operations in the `CheckpointCoordinator` must be running in the same thread. This is currently the case. - The `OperatorCoordinator` must have strict ordering between completing the checkpoint future and calling the `sendEventToOperator()` method. That is the case if either both are coming from the same thread or are guarded under the same lock. The `OperatorEventValve` currently guards its operations under a lock. A simper and more robust design could be to move all operations (event forwarding to the operators and `checkpointCoordinator()` calls) to the scheduler executor. ## Brief change log - Introduce `OperatorEventValve` and integrate it with the `OperatorCoordinatorCheckpointContext` and the `Checkpoint Coordinator`. - Introduce `CoordinatorEventsExactlyOnceITCase` to verify the above described scenario. ## Verifying this change This change can currently only be verified by the included IT Case. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org