Also, you're using the FsStateBackend, correct? Reason I'm asking is that the problem should not occur for the RocksDB state backend. There, we don't serialize any user code, only binary data. A while back I wanted to change the FsStateBackend to also work like this. Now might be a good time to actually do this. :-)
On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <trohrm...@apache.org> wrote: > Hi Josh, > > you could also try to replace your anonymous classes by explicit class > definitions. This should assign these classes a fixed name independent of > the other anonymous classes. Then the class loader should be able to > deserialize your serialized data. > > Cheers, > Till > > On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Josh, >> I think in your case the problem is that Scala might choose different >> names for synthetic/generated classes. This will trip up the code that is >> trying to restore from a snapshot that was done with an earlier version of >> the code where classes where named differently. >> >> I'm afraid I don't know how to solve this one right now, except by >> switching to Java. >> >> Cheers, >> Aljoscha >> >> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <m...@apache.org> wrote: >> >>> Hi Josh, >>> >>> You have to assign UIDs to all operators to change the topology. Plus, >>> you have to add dummy operators for all UIDs which you removed; this >>> is a limitation currently because Flink will attempt to find all UIDs >>> of the old job. >>> >>> Cheers, >>> Max >>> >>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jof...@gmail.com> wrote: >>> > Hi all, >>> > Is there any information out there on how to avoid breaking saved >>> > states/savepoints when making changes to a Flink job and redeploying >>> it? >>> > >>> > I want to know how to avoid exceptions like this: >>> > >>> > java.lang.RuntimeException: Failed to deserialize state handle and >>> setup >>> > initial operator state. >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551) >>> > at java.lang.Thread.run(Thread.java:745) >>> > Caused by: java.lang.ClassNotFoundException: >>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 >>> > >>> > >>> > The best information I could find in the docs is here: >>> > >>> > >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html >>> > >>> > >>> > Having made the suggested changes to my job (i.e. giving a uid to every >>> > stateful sink and map function), what changes to the job/topology are >>> then >>> > allowed/not allowed? >>> > >>> > >>> > If I'm 'naming' my states by providing uids, why does Flink need to >>> look for >>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ? >>> > >>> > >>> > Thanks for any advice, >>> > >>> > Josh >>> >> >