Hello everyone, in order to look for a subject for my diploma thesis, I was at the Flink Forward conference in October. I talked to Aljoscha and some others there and after the Fault Tolerance talk on day 2 I arrived at the idea that an incremental checkpointing of the state of a process when a barrier arrives would probably be a nice feature for Flink and a good topic for my thesis. This would espescially be interesting for very large, e.g. key-value based, states that are necessary for scenarios like decentralised material views ([1], [2]). Independently, Asterios from the TU Berlin suggested to me the same topic when I met him. This is nothing new, e.g. Apache Samza does incremental backup of internal state as well by writing every change to a special Kafka topic from which it can be restored when something fails. The approach for Flink would rather be an adaption of the current checkpointing mechanism.
So my main questions are: * would incremental checkpoints be an appreciated change and do you think it would fit a diploma thesis by the effort that's necessary? * is there already someone working in this area? I already put some initial thoughts into how it might be possible to achieve the goal: How to checkpoint: (I) Memorize which changes have been made after last checkpoint - Pro: Lightweight solution, since only the things that changed need to be compressed and transfered - Contra: You would want to support this not only for each "state variable" but also inside them, e.g. for lists, key-value structures, everything. Unfortunately there doesn't seem to be the possibility to observe changes made on plain java collections or objects in general (or is there?). So you would need to use a different collection library or a wrapper around the existing java standard ones. - I could imagine the checkpointing somehow like this: (1) The programmer of the transformation (with state) uses for the OperatorState a (wrapped) collection/other type that implements a certain interface (e.g. "IncrementallyCheckpointed") that demands something like a changesSinceLastCheckpoint() function (2) The flink runtime would check if the state is implementing IncrementallyCheckpointed and if yes, calls the changesSinceLastCheckpoint() function. (3) There would be the need to differentiate between "regular/full" checkpoints of a state and "incremental" ones when transferring the checkpoint to the backup/checkpoint server. (II) Keep last state and make a diff (preferably with the already serialised checkpoint): - Pro: Much easier solution, doesn't need wrapping or adapting of collections or other types, very general approach, the transferred data shouldn't be more than in case (I) - maybe in some cases even less - Contra: Would usually almost double the memory needs of the transformation, for large collections this would also mean quite some processing effort for computing the diff (III?) Is there another kind of approach you could imagine? Which parts need change: - The checkpointing of the transformation state (but not the restoring of the state, this stays the same) - The protocol of how to transfer the checkpoints needs at least meta data (full/normal checkpoint vs. incremental) - The checkpoint server needs to be able to update its current state from the diffs/changes it receives I would really appreciate help and assessment of these ideas and the general subject. Also, if someone could give me a quick overview over the details of the current checkpointing (and which parts of the code are worth exploring), I'd be happy about that too! Thanks in advance, Marius