Here are the error logs. First error log was encountered when getting the values from the MapState.
java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 22 common frames omitted Wrapped by: java.lang.NoClassDefFoundError: com/test/MatcherFactory$$Lambda$879/1452224137 at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) ... 17 frames truncated Subsequent error logs were encountered on task manager restart (for the same job). java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137 at java.lang.Class.forName0(Class.java) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 18 common frames omitted Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.test.MatcherFactory$$Lambda$879/1452224137 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) ... 8 frames truncated ... 6 common frames omitted Wrapped by: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) ... 2 frames truncated Jayant Ameta On Wed, Nov 21, 2018 at 3:17 PM Dominik Wosiński <wos...@gmail.com> wrote: > Hey Jayant, > > I don't really think that the sole fact of using Predicate should cause > the *ClassNotFoundException* that You are talking about. The exception > may come from the fact that some libraries are missing from Your cluster > environment. Have You tried running the job locally to verify that the > exception occurs? Also, could You please paste some logs here, they may > help in determining the exact reason for the problem. > > Best Regards, > Dom. > > > > śr., 21 lis 2018 o 04:41 Jayant Ameta <wittyam...@gmail.com> napisał(a): > >> Hi, >> I want to store a custom POJO in the MapState. One of the fields in the >> object is a java.util.function.Predicate type. >> Flink gives ClassNotFoundException exception on the lambda. How do I >> store this object in the mapState? >> >> Marking the predicate field as transient is an option. But in my >> use-case, the predicate field is set using another library, and I don't >> want to call it every time I want. >> >> >> Jayant Ameta >> >