Hi, Our state keys are all inline tuples so they get moved around during refactoring, I guess scala compile them to anonymous class. I will switched to named class, just in case.
Best regards, Kien On Aug 18, 2017, 11:09, at 11:09, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: >Hi Kien, > >Thank you for reporting this. >Generally, this is currently a restriction with the Scala API. Type >serializers are generated anonymous classes when using the Scala API, >and therefore is sensitive to the ordering / enclosing class / Scala >version / etc. when generating them. >We’re discussing a fix for this at the moment, I’ll get back to you >once we figure that out. > >As for the case you’ve bumped into here, you seem to be failing on one >of the key serializers when restoring the timer service. >Have you happened to also refactored classes that are used as keys? >Maybe what you could try doing, as a means to avoid that for now, is to >make sure that the key classes are untouched. > >Please keep us updated on how this works out for you, I’ll continue to >look into it. > >Thanks, >Gordon > >On 17 August 2017 at 10:46:05 AM, Kien Truong (duckientru...@gmail.com) >wrote: > >Hi, > >After some refactoring: moving some operator to separate >functions/file, I'm encountering a lot of exceptions like these. The >logic of the application did not change, and all the refactored >operators are stateless, e.g simple map/flatmap/filter. > >Does anyone know how to fix/avoid/work around this? > >I'm using FsStateBackend on Flink 1.3.2 > >Regards, >Kien > >java.lang.ClassNotFoundException: > >x.x.X$$anon$113$$anon$55 >at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >at java.lang.Class.forName0(Native Method) >at java.lang.Class.forName(Class.java:348) >at >org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) >at >java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) >at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) >at >java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) >at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) >at >org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305) >at >org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:321) >at >org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141) >at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496) >at >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104) >at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) >at >org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) >at >org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) >at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >at java.lang.Thread.run(Thread.java:748)