Hi,
Stefan and I are currently working on preparing our state infrastructure
for the introduction of key-grouped state. This is the parent issue for
key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
this is the specific issue that we are currently working on
https://issues.apache.org/jira/browse/FLINK-4381.

We are at a point where we think that we have a reasonable implementation
that we think is good and quite future proof. Unfortunately, this would
break compatibility of Savepoints between versions before our code and
versions with our changes. We would like to discuss how to proceed with
this since it has the potential to affect a lot of people. I'll first try
and explain the current state of state (pun intended) and then give an
overview of the changes that we currently have.

In the current version (Flink 1.1, master ...) the state that an operator
sends to the CheckpointCoordinator is a black box (serialized using Java
Serialization). The checkpoint coordinator stores it and when a job is
restarted it sends these black boxes to the tasks which know how to read
them again. The serialized object that the tasks sends as state roughly
looks like this:

class StreamTaskStateList {
  StreamTaskState states[]
}

class StreamTaskState {
  StateHandle<?> operatorState;
  StateHandle<Serializable> functionState;
  HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
}

the kv states are a map from state name to a snapshot of a keyed state (all
of it). The other fields are more black boxes of state that was serialized
using Java Serialization.

Our current code sends this around between task and CheckpointCoordinator:

StreamStateHandle[] chainedStates
KeyGroupsStateHandle keyGroupsState

class KeyGroupsStateHandle {
  Map<Integer, Long> keyGroups // this is the index
  StreamStateHandle data // this is the actual state for each key group
}

the index is used for finding the contents for a specific key group in the
stream. The chained states are the state for each operator in the chain,
written to a stream. (This contains both operator state and the function
state). This representation allows us to break the chain of operators if we
want to in the future because we have the state of each operator separately
and the checkpoint coordinator is aware of it. The key-group state
representation allows the checkpoint coordinator to re-assign the key
groups to operators upon restore. We should also mention that all of the
state on the CheckpointCoordinator side will be serialized using our code,
not Java serialization. This should allow the possibility of schema
evolution in the future.

The problem now is that stuff breaks when we try and restore from a
savepoint with the old format on a system that uses the new format. The
only solution that I see right now is to keep all the old state classes as
they are. Create a new hierarchy for Flink 1.2 and when restoring from a
pre-1.2 savepoint we have to manually try and tweeze out the old state and
put it into the new representation. This will get nasty very quickly, for
example, think about the state backends where we basically have to have two
versions now and have code that can read from the old state and then funnel
that into the new-version state backend somehow. As I said, it's not
impossible but very involved.

By the way, this problem of versions of code is not restricted to
savepoints, for every code that could be affected by loading versions of
stuff from earlier code we essentially have to keep them as they are
forever. (Or teach operators how to load state from earlier versions as
well.)

So, what do you think about this?

Cheers,
Stefan & Aljoscha

Reply via email to