Thanks Gordon for the suggestion, I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization
So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error : org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB. at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92) at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14) at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42) at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9) at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90) ... 8 more On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Apoorv, > > Flink currently does not natively support schema evolution for state types > using Scala case classes [1]. > > So, as Roman has pointed out, there are 2 possible ways for you to do that: > - Implementing a custom serializer that support schema evolution for your > specific Scala case classes, as Roman suggested. > - or, using the State Processor API [2] to migrate your case classes > offline as a batch job > > For your question on how to implement a schema-evolution supporting > serializer, can you share with me the problems you have met so far? > Otherwise, if you take a look at the PojoSerializerSnapshot class, that > would be a starting point to implement something similar for your case > classes. > > As you will quickly realize, it's not simple, so I would strongly suggest > trying out the approach of using the State Processor API. > Either way, if you bump into any problems, feel free to let me know. > > Cheers, > Gordon > > [1] https://issues.apache.org/jira/browse/FLINK-10896 > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay < > apoorv.upadh...@razorpay.com> wrote: > >> Thanks a lot , Also can you share one example where these has been >> implemented? I have gone through docs does not happen to work still >> >> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman < >> khachatryan.ro...@gmail.com> wrote: >> >>> Hi Apoorv, >>> >>> You can achieve this by implementing custom serializers for your state. >>> Please refer to >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html >>> >>> Regards, >>> Roman >>> >>> >>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay < >>> apoorv.upadh...@razorpay.com> wrote: >>> >>>> Hi Roman, >>>> >>>> I have successfully migrated to flink 1.8.2 with the savepoint created >>>> by flink 1.6.2. >>>> Now I have to modify few case classes due to new requirement I have >>>> created a savepoint and when I run the app with modified class from the >>>> savepoint it throws error "state not compatible" >>>> Previously there were no serializer used. >>>> I now wish to support state schema Hence need suggestion how can i >>>> achieve that ? >>>> >>>> Regards >>>> >>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman < >>>> khachatryan.ro...@gmail.com> wrote: >>>> >>>>> Hi ApoorvK, >>>>> >>>>> I understand that you have a savepoint created by Flink 1.6.2 and you >>>>> want to use it with Flink 1.8.2. The classes themselves weren't modified. >>>>> Is that correct? >>>>> Which serializer did you use? >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <apoorv.upadh...@razorpay.com> >>>>> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> Earlier we have developed on flink 1.6.2 , So there are lots of case >>>>>> classes >>>>>> which have Map,Nested case class within them for example below : >>>>>> >>>>>> case class MyCaseClass(var a: Boolean, >>>>>> var b: Boolean, >>>>>> var c: Boolean, >>>>>> var d: NestedCaseClass, >>>>>> var e:Int){ >>>>>> def this(){this(false,false,new NestedCaseClass,0)} >>>>>> } >>>>>> >>>>>> >>>>>> Now we have migrated to flink 1.8.2 , I need help to figure out how >>>>>> can I >>>>>> achieve state schema evolution for such classes. >>>>>> >>>>>> 1. Is creating avro for these classes now, and implement avro >>>>>> serialisation >>>>>> will that work ? >>>>>> 2. Or if I register kyroserialiser with protobuf serialiser at env? >>>>>> >>>>>> Please suggest what can be done here, or redirect for the avros >>>>>> serialisation example. >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>>