StephanEwen opened a new pull request #12234:
URL: https://github.com/apache/flink/pull/12234


   ## What is the purpose of the change
   
   Ensures that sending of `OperatorEvents` from `OperatorCoordinator` to 
`StreamOperator` follows exactly-once semantics.
   
   The checkpointing procedure is as follows:
     - `OperatorCoordinator` takes its checkpoint. All events it has sent 
before that should be received by the Operators before those take their 
checkpoint.
     - Barriers must only be injected to the sources when all Operator 
Coordinators are done with their checkpoints, so that they can be injected at 
the same time (otherwise sone sources get barriers earlier and we make 
checkpoint alignment worse).
     - Between one coordinator completing its checkpoint and the last one 
completing its checkpoints, events must not be transported.
   
   Consider this example:
   ```
   Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . 
|barrier| . e . f
   Coordinator two events: => . . x . . |trigger| . . . . . . . . . 
.|complete||barrier| . . y . . z
   ```
   
   Two coordinators trigger checkpoints at the same time. Coordinator two takes 
longer to complete, and in the meantime coordinator one sends more events.
   
   Coordinator one emits events `c` and `d` after it finished its checkpoint, 
meaning the events must take place after the checkpoint. But they are before 
the barrier injection, meaning the runtime task will see them before the 
checkpoint.
   
   This PR introduces the `OperatorEventValve` which is a "gateway" that events 
pass through, from the coordinator to the operator. When the coordinator 
completes its checkpoint, the valve is shut, and it is re-opened after the 
barriers are injected. That way, the events between a coordinator's checkpoint 
completion and the barrier injection are "moved" to after the barriers.
   
   **Failure Handling**
   
     - The valve opens immediately when the currently triggering checkpoint is 
aborted.
     - When a subtask fails, the events that queued for that task are cleared 
and reported back to teh coordinator as failed.
   
   **Threading Model Assumptions**
   
     - All operations in the `CheckpointCoordinator` must be running in the 
same thread. This is currently the case.
     - The `OperatorCoordinator` must have strict ordering between completing 
the checkpoint future and calling the `sendEventToOperator()` method. That is 
the case if either both are coming from the same thread or are guarded under 
the same lock.
   
   The `OperatorEventValve` currently guards its operations under a lock. A 
simper and more robust design could be to move all operations (event forwarding 
to the operators and `checkpointCoordinator()` calls) to the scheduler executor.
   
   ## Brief change log
   
     - Introduce `OperatorEventValve` and integrate it with the 
`OperatorCoordinatorCheckpointContext` and the `Checkpoint Coordinator`.
     - Introduce `CoordinatorEventsExactlyOnceITCase` to verify the above 
described scenario.
   
   ## Verifying this change
   
   This change can currently only be verified by the included IT Case.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to