Hi, I vaguely remember someone implementing a mechanism to deal with it. I think at least at some point (it might have changed since I looked at it), it was solving the problem via canceling the checkpoint in the scenario that you described. However I can not remember from the top of my head neither the ticket number nor where is the code for that. Also I might be completely wrong. If I don't forget, I can try to find it tomorrow.
Best, Piotrek śr., 17 maj 2023 o 17:39 Teoh, Hong <lian...@amazon.co.uk.invalid> napisał(a): > Hi all, > > I’m writing a new source based on the FLIP-27 Source API, and I had some > questions on the checkpointing mechanisms and associated guarantees. Would > appreciate if someone more familiar with the API would be able to provide > insights here! > > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a > SourceReader (running on TM). However, the SourceReader can send events to > the SplitEnumerator. Given this, we have introduced a “loopback” > communication mechanism from TM to JM, and I wonder if/how we handle this > during checkpoints. > > > Example of how data might be lost: > 1. Checkpoint 123 triggered > 2. SplitEnumerator takes checkpoint of state for checkpoint 123 > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this > 4. SourceReader takes checkpoint of state for checkpoint 123 > … > 5. Checkpoint 123 completes > > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once > processed, There is now inconsistent state between SourceReader state and > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed, > whereas SplitEnumerator has not processed OperatorEvent 1) > > Do we have any mechanisms for mitigating this issue? For example, does the > SplitEnumerator re-take the snapshot of state for a checkpoint if an > OperatorEvent is sent before the checkpoint is complete? > > Regards, > Hong