Hi Gordon, Thanks for your response. Generally speaking, I wanted to do what you wrote in the second mail, when I first had a look at the TypeSerializer interface. So I think it really makes sense :)
As you said the requiresMigration does not work right now, I tried looking for some workaround and found out that the ensureCompatibility is called before each state deserialization. So I tried using a flag set in that method, but as you described it does not always work for HeapStateBackend as the state is loaded eagerly with old serializer. I managed to create two serializers, one “old" that deserialises only old format, and “new" one that can deserialise both based on a flag set in ensureCompatibility. Do you think such strategy is valid for now? (as the state migration is not yet there) You can check my code here in lines(702-1087): https://github.com/dawidwys/flink/blob/cep-nfa-serialization/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java Cheers, Dawid > On 7 Jul 2017, at 15:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Some extra information after understanding your case a bit more: > > What you would need to do, is to return a > `CompatibilityResult#requiresMigration(convertSerializer)`, where the > `convertSerializer` is a serializer that does read the extra information. > The `convertSerializer` will only ever be used to read the old data, and the > new serializer is used to serialize state so that it is written in the new > format. > > Unfortunately, currently if you return `requiresMigration()`, the job will > always fail since this is currently not supported yet. State migration is > currently blocked by [1]. > > I’m happy to hear feedback regarding the new compatibility / config snapshot > methods on TypeSerializer. > Let me know if all of this makes sense to you :) > > Cheers, > Gordon > > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-22-Eager-State-Declaration-td18562.html > > > On 7 July 2017 at 9:23:32 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote: > > Hi Dawid, > > First of all, one thing to clarify: TypeSerializer#ensureCompatibility is > invoked on the new provided state serializer. > Also, a reconfigured compatible serializer should NOT have different > serialization formats (that would require state migration, i.e. return > CompatibilityResult#requiresStateMigration). > A reconfigured serializer will be continued to be used by Flink as if nothing > has changed, can read old AND new data, and still writes in the same format. > In your case, I think you may be interpreting “a reconfigured serializer” > incorrectly. > > Unfortunately it does not work for HeapStateBackend as during restoring the > StateBackend the method TypeSerializer#ensureCompatibility is not invoked and > the state value is eagerly deserialized with the not reconfigured serializer. > For the HeapStateBackend, as of now, this is expected. The main reason for > this is that currently, new state serializers are provided lazily (i.e. when > the state descriptor is registered). There is no new serializer available to > be confronted / reconfigured with the previous TypeSerializerConfigSnapshot > at restore time. > > Therefore, for HeapStateBackend, we are using the old serialized serializer > (not invoked with #ensureCompatibility) to read everything to state objects. > This should always work as long as the old serializer can be deserialized > properly. > > Cheers, > Gordon > > On 7 July 2017 at 5:57:56 PM, Dawid Wysakowicz (wysakowicz.da...@gmail.com) > wrote: > > Hi devs, > > Currently I am working on some changes to serializer for NFA class in CEP > library. I am trying to understand how the TypeSerializer#ensureCompatibility > feature works. > > What I want to do is in a previous version (e.g. in 1.3.0) some information > was serialized that now shouldn't. In TypeSerializer#ensureCompatibility I am > setting a flag based on corresponding ConfigSnapshot version that tells me if > that additional info should be read. > > So let’s get to the point :). Unfortunately it does not work for > HeapStateBackend as during restoring the StateBackend the method > TypeSerializer#ensureCompatibility is not invoked and the state value is > eagerly deserialized with the not reconfigured serializer. > It does work though for RocksDBStateBackend, as while restoring there is no > deserialisation of the value(lazy deserialization). It is first deserialized > when accessing (getColumnFamily etc. I suppose) and then the method > ensuringCompatibility is called and the serializer is properly reconfigured. > > My questions are: > - is my serialization plan ok, with setting the flag > - are the different behaviours intended or is it a bug for HeapStateBackend > > If it is a bug, I would be willing to fix it(or at least try), but probably I > will need some guidance. > > Regards > Dawid >
signature.asc
Description: Message signed with OpenPGP