Hi Hong, The checkpoint is triggered by the timer executor of CheckpointCoordinator. It triggers the checkpoint in SourceCoordinator (which is passed to SplitEnumerator) and then in SourceOperator. The checkpoint event is put in SplitEnumerator's event loop to be executed. You can see the details here.
Yours Hongshun On Wed, May 17, 2023 at 11:39 PM Teoh, Hong <lian...@amazon.co.uk.invalid> wrote: > Hi all, > > I’m writing a new source based on the FLIP-27 Source API, and I had some > questions on the checkpointing mechanisms and associated guarantees. Would > appreciate if someone more familiar with the API would be able to provide > insights here! > > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a > SourceReader (running on TM). However, the SourceReader can send events to > the SplitEnumerator. Given this, we have introduced a “loopback” > communication mechanism from TM to JM, and I wonder if/how we handle this > during checkpoints. > > > Example of how data might be lost: > 1. Checkpoint 123 triggered > 2. SplitEnumerator takes checkpoint of state for checkpoint 123 > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this > 4. SourceReader takes checkpoint of state for checkpoint 123 > … > 5. Checkpoint 123 completes > > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once > processed, There is now inconsistent state between SourceReader state and > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed, > whereas SplitEnumerator has not processed OperatorEvent 1) > > Do we have any mechanisms for mitigating this issue? For example, does the > SplitEnumerator re-take the snapshot of state for a checkpoint if an > OperatorEvent is sent before the checkpoint is complete? > > Regards, > Hong