Thanks Gordon for the suggestion,

I am going by this repo :
https://github.com/mrooding/flink-avro-state-serialization

So far I am able to alter the scala case classes and able to restore from
savepoint using memory state backend, but when I am using rocksdb as
statebackend and try to restore from savepoint it break with following
error :

org.apache.flink.util.FlinkRuntimeException: Error while retrieving
data from RocksDB.
        at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
        at 
nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
        at 
nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
        at 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
        at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
        at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at 
nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
        at 
nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
        at 
nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
        at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
        ... 8 more




On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Apoorv,
>
> Flink currently does not natively support schema evolution for state types
> using Scala case classes [1].
>
> So, as Roman has pointed out, there are 2 possible ways for you to do that:
> - Implementing a custom serializer that support schema evolution for your
> specific Scala case classes, as Roman suggested.
> - or, using the State Processor API [2] to migrate your case classes
> offline as a batch job
>
> For your question on how to implement a schema-evolution supporting
> serializer, can you share with me the problems you have met so far?
> Otherwise, if you take a look at the PojoSerializerSnapshot class, that
> would be a starting point to implement something similar for your case
> classes.
>
> As you will quickly realize, it's not simple, so I would strongly suggest
> trying out the approach of using the State Processor API.
> Either way, if you bump into any problems, feel free to let me know.
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-10896
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <
> apoorv.upadh...@razorpay.com> wrote:
>
>> Thanks a lot , Also can you share one example where these has been
>> implemented? I have gone through docs does not happen to work still
>>
>> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Apoorv,
>>>
>>> You can achieve this by implementing custom serializers for your state.
>>> Please refer to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
>>> apoorv.upadh...@razorpay.com> wrote:
>>>
>>>> Hi Roman,
>>>>
>>>> I have successfully migrated to flink 1.8.2 with the savepoint created
>>>> by flink 1.6.2.
>>>> Now I have to modify few case classes due to new requirement I have
>>>> created a savepoint and when I run the app with modified class from the
>>>> savepoint it throws error "state not compatible"
>>>> Previously there were no serializer used.
>>>> I now wish to support state schema Hence need suggestion how can i
>>>> achieve that ?
>>>>
>>>> Regards
>>>>
>>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>>>> khachatryan.ro...@gmail.com> wrote:
>>>>
>>>>> Hi ApoorvK,
>>>>>
>>>>> I understand that you have a savepoint created by Flink 1.6.2 and you
>>>>> want to use it with Flink 1.8.2. The classes themselves weren't modified.
>>>>> Is that correct?
>>>>> Which serializer did you use?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <apoorv.upadh...@razorpay.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Earlier we have developed on flink 1.6.2 , So there are lots of case
>>>>>> classes
>>>>>> which have Map,Nested case class within them for example below :
>>>>>>
>>>>>> case class MyCaseClass(var a: Boolean,
>>>>>>                                  var b: Boolean,
>>>>>>                                  var c: Boolean,
>>>>>>                                  var d: NestedCaseClass,
>>>>>>                                  var e:Int){
>>>>>> def this(){this(false,false,new NestedCaseClass,0)}
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Now we have migrated to flink 1.8.2 , I need help to figure out how
>>>>>> can I
>>>>>> achieve state schema evolution for such classes.
>>>>>>
>>>>>> 1. Is creating avro for these classes now, and implement avro
>>>>>> serialisation
>>>>>> will that work ?
>>>>>> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>>>>>>
>>>>>> Please suggest what can be done here, or redirect for the avros
>>>>>> serialisation example.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Reply via email to