Hi Aljoscha,

I'm not very deep into the state backend implementation. However, I
think a breaking change is unavoidable with the new key groups. The
only way that we achieve backwards-compatibility is to include a
translator from the old state format to the new one. As you already
mentioned, this is quite involved and we would have to maintain this
translator at least until 2.0.

I'm with Gyula; I think we can afford to break this once for 1.2 and
maintain backwards-compatibility afterwards. We should ask some
production users and get their feedback on the problem.

Cheers,
Max

On Thu, Aug 11, 2016 at 5:23 PM, Gyula Fóra <gyf...@apache.org> wrote:
> Hi,
>
> I think this is a very important change for the the future of the system
> that provides a much cleaner internal representation of the states.
>
> You are right that this can in theory break programs written in 1.1 when
> upgraded to 1.2 but I wonder if this will be actually a practical problem
> in the companies using Flink. If it is very important to keep the job
> running they can delay upgrading to 1.2 and find a way to be able to
> restart the job. This will probably come in handy on that rainy Sunday when
> every system suddenly breaks :)
>
> I know this might be a stupid way to look at it  but adding all the
> plumbing code for compatibility might actually cause more issues in the
> long run than it wins for us.
>
> Just my thoughts :)
>
> Regards,
> Gyula
>
>
> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2016. aug. 11.,
> Cs, 16:52):
>
>> Hi,
>> Stefan and I are currently working on preparing our state infrastructure
>> for the introduction of key-grouped state. This is the parent issue for
>> key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
>> this is the specific issue that we are currently working on
>> https://issues.apache.org/jira/browse/FLINK-4381.
>>
>> We are at a point where we think that we have a reasonable implementation
>> that we think is good and quite future proof. Unfortunately, this would
>> break compatibility of Savepoints between versions before our code and
>> versions with our changes. We would like to discuss how to proceed with
>> this since it has the potential to affect a lot of people. I'll first try
>> and explain the current state of state (pun intended) and then give an
>> overview of the changes that we currently have.
>>
>> In the current version (Flink 1.1, master ...) the state that an operator
>> sends to the CheckpointCoordinator is a black box (serialized using Java
>> Serialization). The checkpoint coordinator stores it and when a job is
>> restarted it sends these black boxes to the tasks which know how to read
>> them again. The serialized object that the tasks sends as state roughly
>> looks like this:
>>
>> class StreamTaskStateList {
>>   StreamTaskState states[]
>> }
>>
>> class StreamTaskState {
>>   StateHandle<?> operatorState;
>>   StateHandle<Serializable> functionState;
>>   HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
>> }
>>
>> the kv states are a map from state name to a snapshot of a keyed state (all
>> of it). The other fields are more black boxes of state that was serialized
>> using Java Serialization.
>>
>> Our current code sends this around between task and CheckpointCoordinator:
>>
>> StreamStateHandle[] chainedStates
>> KeyGroupsStateHandle keyGroupsState
>>
>> class KeyGroupsStateHandle {
>>   Map<Integer, Long> keyGroups // this is the index
>>   StreamStateHandle data // this is the actual state for each key group
>> }
>>
>> the index is used for finding the contents for a specific key group in the
>> stream. The chained states are the state for each operator in the chain,
>> written to a stream. (This contains both operator state and the function
>> state). This representation allows us to break the chain of operators if we
>> want to in the future because we have the state of each operator separately
>> and the checkpoint coordinator is aware of it. The key-group state
>> representation allows the checkpoint coordinator to re-assign the key
>> groups to operators upon restore. We should also mention that all of the
>> state on the CheckpointCoordinator side will be serialized using our code,
>> not Java serialization. This should allow the possibility of schema
>> evolution in the future.
>>
>> The problem now is that stuff breaks when we try and restore from a
>> savepoint with the old format on a system that uses the new format. The
>> only solution that I see right now is to keep all the old state classes as
>> they are. Create a new hierarchy for Flink 1.2 and when restoring from a
>> pre-1.2 savepoint we have to manually try and tweeze out the old state and
>> put it into the new representation. This will get nasty very quickly, for
>> example, think about the state backends where we basically have to have two
>> versions now and have code that can read from the old state and then funnel
>> that into the new-version state backend somehow. As I said, it's not
>> impossible but very involved.
>>
>> By the way, this problem of versions of code is not restricted to
>> savepoints, for every code that could be affected by loading versions of
>> stuff from earlier code we essentially have to keep them as they are
>> forever. (Or teach operators how to load state from earlier versions as
>> well.)
>>
>> So, what do you think about this?
>>
>> Cheers,
>> Stefan & Aljoscha
>>

Reply via email to