Hi Gordon,

Thanks for your response. Generally speaking, I wanted to do what you wrote in 
the second mail, when I first had a look at the TypeSerializer interface. So I 
think it really makes sense :)

As you said the requiresMigration does not work right now, I tried looking for 
some workaround and found out that the ensureCompatibility is called before 
each state deserialization. So I tried using a flag set in that method, but as 
you described it does not always work for HeapStateBackend as the state is 
loaded eagerly with old serializer. I managed to create two serializers, one 
“old" that deserialises only old format, and “new" one that can deserialise 
both based on a flag set in ensureCompatibility. Do you think such strategy is 
valid for now? (as the state migration is not yet there)

You can check my code here in lines(702-1087):
https://github.com/dawidwys/flink/blob/cep-nfa-serialization/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java

Cheers,
Dawid

> On 7 Jul 2017, at 15:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Some extra information after understanding your case a bit more:
> 
> What you would need to do, is to return a 
> `CompatibilityResult#requiresMigration(convertSerializer)`, where the 
> `convertSerializer` is a serializer that does read the extra information.
> The `convertSerializer` will only ever be used to read the old data, and the 
> new serializer is used to serialize state so that it is written in the new 
> format.
> 
> Unfortunately, currently if you return `requiresMigration()`, the job will 
> always fail since this is currently not supported yet. State migration is 
> currently blocked by [1].
> 
> I’m happy to hear feedback regarding the new compatibility / config snapshot 
> methods on TypeSerializer.
> Let me know if all of this makes sense to you :)
> 
> Cheers,
> Gordon
> 
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-22-Eager-State-Declaration-td18562.html
> 
> 
> On 7 July 2017 at 9:23:32 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:
> 
> Hi Dawid,
> 
> First of all, one thing to clarify: TypeSerializer#ensureCompatibility is 
> invoked on the new provided state serializer.
> Also, a reconfigured compatible serializer should NOT have different 
> serialization formats (that would require state migration, i.e. return 
> CompatibilityResult#requiresStateMigration).
> A reconfigured serializer will be continued to be used by Flink as if nothing 
> has changed, can read old AND new data, and still writes in the same format.
> In your case, I think you may be interpreting “a reconfigured serializer” 
> incorrectly.
> 
> Unfortunately it does not work for HeapStateBackend as during restoring the 
> StateBackend the method TypeSerializer#ensureCompatibility is not invoked and 
> the state value is eagerly deserialized with the not reconfigured serializer.
> For the HeapStateBackend, as of now, this is expected. The main reason for 
> this is that currently, new state serializers are provided lazily (i.e. when 
> the state descriptor is registered). There is no new serializer available to 
> be confronted / reconfigured with the previous TypeSerializerConfigSnapshot 
> at restore time.
> 
> Therefore, for HeapStateBackend, we are using the old serialized serializer 
> (not invoked with #ensureCompatibility) to read everything to state objects. 
> This should always work as long as the old serializer can be deserialized 
> properly.
> 
> Cheers,
> Gordon
> 
> On 7 July 2017 at 5:57:56 PM, Dawid Wysakowicz (wysakowicz.da...@gmail.com) 
> wrote:
> 
> Hi devs,
> 
> Currently I am working on some changes to serializer for NFA class in CEP 
> library. I am trying to understand how the TypeSerializer#ensureCompatibility 
> feature works.
> 
> What I want to do is in a previous version (e.g. in 1.3.0) some information 
> was serialized that now shouldn't. In TypeSerializer#ensureCompatibility I am 
> setting a flag based on corresponding ConfigSnapshot version that tells me if 
> that additional info should be read.
> 
> So let’s get to the point :). Unfortunately it does not work for 
> HeapStateBackend as during restoring the StateBackend the method 
> TypeSerializer#ensureCompatibility is not invoked and the state value is 
> eagerly deserialized with the not reconfigured serializer.
> It does work though for RocksDBStateBackend, as while restoring there is no 
> deserialisation of the value(lazy deserialization). It is first deserialized 
> when accessing (getColumnFamily etc. I suppose) and then the method 
> ensuringCompatibility is called and the serializer is properly reconfigured.
> 
> My questions are:
> - is my serialization plan ok, with setting the flag
> - are the different behaviours intended or is it a bug for HeapStateBackend
> 
> If it is a bug, I would be willing to fix it(or at least try), but probably I 
> will need some guidance.
> 
> Regards
> Dawid
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to