Hi all, 

I’m writing a new source based on the FLIP-27 Source API, and I had some 
questions on the checkpointing mechanisms and associated guarantees. Would 
appreciate if someone more familiar with the API would be able to provide 
insights here!

In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a 
SourceReader (running on TM). However, the SourceReader can send events to the 
SplitEnumerator. Given this, we have introduced a “loopback” communication 
mechanism from TM to JM, and I wonder if/how we handle this during checkpoints.


Example of how data might be lost:
1. Checkpoint 123 triggered
2. SplitEnumerator takes checkpoint of state for checkpoint 123
3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
4. SourceReader takes checkpoint of state for checkpoint 123
…
5. Checkpoint 123 completes

Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once processed, 
There is now inconsistent state between SourceReader state and SplitEnumerator 
state. (SourceReader assumes OperatorEvent 1 is processed, whereas 
SplitEnumerator has not processed OperatorEvent 1)

Do we have any mechanisms for mitigating this issue? For example, does the 
SplitEnumerator re-take the snapshot of state for a checkpoint if an 
OperatorEvent is sent before the checkpoint is complete?

Regards,
Hong

Reply via email to