Hello everyone,

in order to look for a subject for my diploma thesis, I was at the Flink
Forward conference in October. I talked to Aljoscha and some others
there and after the Fault Tolerance talk on day 2 I arrived at the idea
that an incremental checkpointing of the state of a process when a
barrier arrives would probably be a nice feature for Flink and a good
topic for my thesis. This would espescially be interesting for very
large, e.g. key-value based, states that are necessary for scenarios
like decentralised material views ([1], [2]). Independently, Asterios
from the TU Berlin suggested to me the same topic when I met him. This
is nothing new, e.g. Apache Samza does incremental backup of internal
state as well by writing every change to a special Kafka topic from
which it can be restored when something fails. The approach for Flink
would rather be an adaption of the current checkpointing mechanism.

So my main questions are:
* would incremental checkpoints be an appreciated change and do you
think it would fit a diploma thesis by the effort that's necessary?
* is there already someone working in this area?

I already put some initial thoughts into how it might be possible to
achieve the goal:


How to checkpoint:
(I) Memorize which changes have been made after last checkpoint
  - Pro: Lightweight solution, since only the things that changed need
to be compressed and transfered
  - Contra: You would want to support this not only for each "state
variable" but also inside them, e.g. for lists, key-value structures,
everything. Unfortunately there doesn't seem to be the possibility to
observe changes made on plain java collections or objects in general (or
is there?). So you would need to use a different collection library or a
wrapper around the existing java standard ones.
  - I could imagine the checkpointing somehow like this:
    (1) The programmer of the transformation (with state) uses for the
OperatorState a (wrapped) collection/other type that implements a
certain interface (e.g. "IncrementallyCheckpointed") that demands
something like a changesSinceLastCheckpoint() function
    (2) The flink runtime would check if the state is implementing
IncrementallyCheckpointed and if yes, calls the
changesSinceLastCheckpoint() function.
    (3) There would be the need to differentiate between "regular/full"
checkpoints of a state and "incremental" ones when transferring the
checkpoint to the backup/checkpoint server.

(II) Keep last state and make a diff (preferably with the already
serialised checkpoint):
  - Pro: Much easier solution, doesn't need wrapping or adapting of
collections or other types, very general approach, the transferred data
shouldn't be more than in case (I) - maybe in some cases even less
  - Contra: Would usually almost double the memory needs of the
transformation, for large collections this would also mean quite some
processing effort for computing the diff

(III?) Is there another kind of approach you could imagine?

Which parts need change:
  - The checkpointing of the transformation state (but not the restoring
of the state, this stays the same)
  - The protocol of how to transfer the checkpoints needs at least meta
data (full/normal checkpoint vs. incremental)
  - The checkpoint server needs to be able to update its current state
from the diffs/changes it receives


I would really appreciate help and assessment of these ideas and the
general subject. Also, if someone could give me a quick overview over
the details of the current checkpointing (and which parts of the code
are worth exploring), I'd be happy about that too!

Thanks in advance,
Marius

Reply via email to