Hi, Stefan and I are currently working on preparing our state infrastructure for the introduction of key-grouped state. This is the parent issue for key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while this is the specific issue that we are currently working on https://issues.apache.org/jira/browse/FLINK-4381.
We are at a point where we think that we have a reasonable implementation that we think is good and quite future proof. Unfortunately, this would break compatibility of Savepoints between versions before our code and versions with our changes. We would like to discuss how to proceed with this since it has the potential to affect a lot of people. I'll first try and explain the current state of state (pun intended) and then give an overview of the changes that we currently have. In the current version (Flink 1.1, master ...) the state that an operator sends to the CheckpointCoordinator is a black box (serialized using Java Serialization). The checkpoint coordinator stores it and when a job is restarted it sends these black boxes to the tasks which know how to read them again. The serialized object that the tasks sends as state roughly looks like this: class StreamTaskStateList { StreamTaskState states[] } class StreamTaskState { StateHandle<?> operatorState; StateHandle<Serializable> functionState; HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates; } the kv states are a map from state name to a snapshot of a keyed state (all of it). The other fields are more black boxes of state that was serialized using Java Serialization. Our current code sends this around between task and CheckpointCoordinator: StreamStateHandle[] chainedStates KeyGroupsStateHandle keyGroupsState class KeyGroupsStateHandle { Map<Integer, Long> keyGroups // this is the index StreamStateHandle data // this is the actual state for each key group } the index is used for finding the contents for a specific key group in the stream. The chained states are the state for each operator in the chain, written to a stream. (This contains both operator state and the function state). This representation allows us to break the chain of operators if we want to in the future because we have the state of each operator separately and the checkpoint coordinator is aware of it. The key-group state representation allows the checkpoint coordinator to re-assign the key groups to operators upon restore. We should also mention that all of the state on the CheckpointCoordinator side will be serialized using our code, not Java serialization. This should allow the possibility of schema evolution in the future. The problem now is that stuff breaks when we try and restore from a savepoint with the old format on a system that uses the new format. The only solution that I see right now is to keep all the old state classes as they are. Create a new hierarchy for Flink 1.2 and when restoring from a pre-1.2 savepoint we have to manually try and tweeze out the old state and put it into the new representation. This will get nasty very quickly, for example, think about the state backends where we basically have to have two versions now and have code that can read from the old state and then funnel that into the new-version state backend somehow. As I said, it's not impossible but very involved. By the way, this problem of versions of code is not restricted to savepoints, for every code that could be affected by loading versions of stuff from earlier code we essentially have to keep them as they are forever. (Or teach operators how to load state from earlier versions as well.) So, what do you think about this? Cheers, Stefan & Aljoscha