I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.
Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured? Thanks, Shannon From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> Date: Monday, March 6, 2017 at 7:09 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: AWS exception serialization problem This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs? From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> Date: Monday, March 6, 2017 at 6:39 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it? java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45) ... 12 more