Hi Arvid and Nicolaus Thanks for the reply. I don't think this is an issue with the user code. I'm pretty sure that the code is serializable. I have not used any custom serializer. And the serialization configs are not overridden. My guess was the problem was with order of classloading.
I am deploying in application mode. Before I had my user code jar and dependent libs in /opt/flink/usrlib. I tried to follow this guide https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode and used /usrlib to put it in the jars. Not sure why it was not working for my case. I was seeing the error above. Now I changed to /opt/flink/lib and user code and dependencies with flink libs are in the same dir inside the docker image. Changed `COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/usrlib/poc-flink.jar` to `COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/lib/poc-flink.jar` So the same classloader is used. And it solved the problem. Wondering if any of you have tried bundling user code in $FLINK_HOME/usrlib in application mode? Regards Praneeth Ramesh On Mon, Sep 6, 2021 at 9:28 AM Arvid Heise <ar...@apache.org> wrote: > This looks like you hold a reference to some outer class or > non-serializable class. Make sure that your user function is minimal (e.g. > static or top-level class) and all fields really need to be serialized > (that excludes all caches). > > On Thu, Sep 2, 2021 at 3:13 PM Nicolaus Weidner < > nicolaus.weid...@ververica.com> wrote: > >> Hi Praneeth, >> >> It does look like a failure constructing the serializer. Can you share >> the serialization config you use for the Kafka producer? In particular, are >> you using a custom serializer? >> Do you use any custom classloading configuration? >> >> Best regards, >> Nico >> >> On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh <sr.prane...@gmail.com> >> wrote: >> >>> Hi All >>> >>> I am trying to run a flink scala application which reads from kafka >>> apply some lookup transformations and then writes to kafka. >>> >>> I am using Flink Version 1.12.1 >>> >>> I tested it in local and it works fine. But when I try to run it on >>> cluster using native kubernetes integration I see weird errors like below. >>> >>> The cluster also looks fine, because I tried to run a wordcount >>> application on the cluster and it worked fine. >>> >>> The exception is not clear and also the stacktrace shows the taskmanager >>> stack trace and hence no idea where in the application the problem could >>> be. Could this be a serialization issue? Is there a way to debug such >>> issues and find the actual point in application code where there is a >>> problem? >>> >>> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could >>> not instantiate serializer. >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at java.lang.Thread.run(Unknown Source) [?:?] >>> Caused by: java.io.IOException: unexpected exception type >>> at java.io.ObjectStreamClass.throwMiscException(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> ... 8 more >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.ClassNotFoundException: >>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ >>> at >>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown >>> Source) ~[?:?] >>> at >>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) >>> ~[?:?] >>> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] >>> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> ... 8 more >>> Caused by: java.lang.ClassNotFoundException: >>> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ >>> at >>> scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] >>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] >>> at java.lang.Class.forName0(Native Method) ~[?:?] >>> at java.lang.Class.forName(Unknown Source) ~[?:?] >>> at >>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown >>> Source) ~[?:?] >>> at >>> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) >>> ~[?:?] >>> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] >>> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>> ~[?:?] >>> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) >>> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] >>> ... 8 more``` >>> >>> -- Regards Praneeth Ramesh