Hi Dawid,

It would be possible to make this work by reconfiguring the serializer to be 
able to read both “old” and “new” formats, and write in only the new format, 
and just return `CompatibilityResult.compatible()`.
Note that this typically isn’t recommended, since if you’re using RocksDB 
backend which lazily deserializes and eagerly serializes on each state access, 
you’ll have mixed formats in the state (i.e.,keys that are not yet accessed 
will be in the old format).
This would entail a lot of maintenance code just for compatibility in the 
future.

Also note that the above approach isn’t respecting the definition of serializer 
compatibility. Two serializers are compatible iff their written binary formats 
are identical. That’s why in your case it is only reasonable to use 
`CompatibilityResult.requiresMigration`.
I see in your code snippet that is what you are doing right now, but then 
again, keep in mind that state migration isn’t available yet.

Does that answer your question?

Cheers,
Gordon

On 10 July 2017 at 3:38:16 PM, Dawid Wysakowicz (wysakowicz.da...@gmail.com) 
wrote:

Hi Gordon,  

Thanks for your response. Generally speaking, I wanted to do what you wrote in 
the second mail, when I first had a look at the TypeSerializer interface. So I 
think it really makes sense :)  

As you said the requiresMigration does not work right now, I tried looking for 
some workaround and found out that the ensureCompatibility is called before 
each state deserialization. So I tried using a flag set in that method, but as 
you described it does not always work for HeapStateBackend as the state is 
loaded eagerly with old serializer. I managed to create two serializers, one 
“old" that deserialises only old format, and “new" one that can deserialise 
both based on a flag set in ensureCompatibility. Do you think such strategy is 
valid for now? (as the state migration is not yet there)  

You can check my code here in lines(702-1087):  
https://github.com/dawidwys/flink/blob/cep-nfa-serialization/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
  

Cheers,  
Dawid  

> On 7 Jul 2017, at 15:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:  
>  
> Some extra information after understanding your case a bit more:  
>  
> What you would need to do, is to return a 
> `CompatibilityResult#requiresMigration(convertSerializer)`, where the 
> `convertSerializer` is a serializer that does read the extra information.  
> The `convertSerializer` will only ever be used to read the old data, and the 
> new serializer is used to serialize state so that it is written in the new 
> format.  
>  
> Unfortunately, currently if you return `requiresMigration()`, the job will 
> always fail since this is currently not supported yet. State migration is 
> currently blocked by [1].  
>  
> I’m happy to hear feedback regarding the new compatibility / config snapshot 
> methods on TypeSerializer.  
> Let me know if all of this makes sense to you :)  
>  
> Cheers,  
> Gordon  
>  
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-22-Eager-State-Declaration-td18562.html
>   
>  
>  
> On 7 July 2017 at 9:23:32 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
> wrote:  
>  
> Hi Dawid,  
>  
> First of all, one thing to clarify: TypeSerializer#ensureCompatibility is 
> invoked on the new provided state serializer.  
> Also, a reconfigured compatible serializer should NOT have different 
> serialization formats (that would require state migration, i.e. return 
> CompatibilityResult#requiresStateMigration).  
> A reconfigured serializer will be continued to be used by Flink as if nothing 
> has changed, can read old AND new data, and still writes in the same format.  
> In your case, I think you may be interpreting “a reconfigured serializer” 
> incorrectly.  
>  
> Unfortunately it does not work for HeapStateBackend as during restoring the 
> StateBackend the method TypeSerializer#ensureCompatibility is not invoked and 
> the state value is eagerly deserialized with the not reconfigured serializer. 
>  
> For the HeapStateBackend, as of now, this is expected. The main reason for 
> this is that currently, new state serializers are provided lazily (i.e. when 
> the state descriptor is registered). There is no new serializer available to 
> be confronted / reconfigured with the previous TypeSerializerConfigSnapshot 
> at restore time.  
>  
> Therefore, for HeapStateBackend, we are using the old serialized serializer 
> (not invoked with #ensureCompatibility) to read everything to state objects. 
> This should always work as long as the old serializer can be deserialized 
> properly.  
>  
> Cheers,  
> Gordon  
>  
> On 7 July 2017 at 5:57:56 PM, Dawid Wysakowicz (wysakowicz.da...@gmail.com) 
> wrote:  
>  
> Hi devs,  
>  
> Currently I am working on some changes to serializer for NFA class in CEP 
> library. I am trying to understand how the TypeSerializer#ensureCompatibility 
> feature works.  
>  
> What I want to do is in a previous version (e.g. in 1.3.0) some information 
> was serialized that now shouldn't. In TypeSerializer#ensureCompatibility I am 
> setting a flag based on corresponding ConfigSnapshot version that tells me if 
> that additional info should be read.  
>  
> So let’s get to the point :). Unfortunately it does not work for 
> HeapStateBackend as during restoring the StateBackend the method 
> TypeSerializer#ensureCompatibility is not invoked and the state value is 
> eagerly deserialized with the not reconfigured serializer.  
> It does work though for RocksDBStateBackend, as while restoring there is no 
> deserialisation of the value(lazy deserialization). It is first deserialized 
> when accessing (getColumnFamily etc. I suppose) and then the method 
> ensuringCompatibility is called and the serializer is properly reconfigured.  
>  
> My questions are:  
> - is my serialization plan ok, with setting the flag  
> - are the different behaviours intended or is it a bug for HeapStateBackend  
>  
> If it is a bug, I would be willing to fix it(or at least try), but probably I 
> will need some guidance.  
>  
> Regards  
> Dawid  
>  

Reply via email to