Hi all, I’m writing a new source based on the FLIP-27 Source API, and I had some questions on the checkpointing mechanisms and associated guarantees. Would appreciate if someone more familiar with the API would be able to provide insights here!
In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a SourceReader (running on TM). However, the SourceReader can send events to the SplitEnumerator. Given this, we have introduced a “loopback” communication mechanism from TM to JM, and I wonder if/how we handle this during checkpoints. Example of how data might be lost: 1. Checkpoint 123 triggered 2. SplitEnumerator takes checkpoint of state for checkpoint 123 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this 4. SourceReader takes checkpoint of state for checkpoint 123 … 5. Checkpoint 123 completes Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once processed, There is now inconsistent state between SourceReader state and SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed, whereas SplitEnumerator has not processed OperatorEvent 1) Do we have any mechanisms for mitigating this issue? For example, does the SplitEnumerator re-take the snapshot of state for a checkpoint if an OperatorEvent is sent before the checkpoint is complete? Regards, Hong