@Shannon @Gordon - is there some shading logic involved in the dependencies, concerning the AWS libraries?
On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > I just had a quick look on this, but the Kafka fetcher thread’s context > classloader doesn’t seem to be the issue (at least for 1.1.4). > > In Flink 1.1.4, a separate thread from the task thread is created to run > the fetcher, but since the task thread sets the user code classloader as > its context classloader, shouldn’t any threads created from it (i.e., the > fetcher thread) use it also? > > A quickly checked the context classloader the Kafka09Fetcher thread in > 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`. > > > On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote: > > Ah, I see... > > The issue is that the Kafka fetcher thread apparently do not have the > user-code class loader set as the context class loader. Kryo relies on that > for class resolution. > > What Flink version are you on? I think that actual processing and > forwarding does not happen in the Kafka Fetchers any more as of 1.2, so > only Flink 1.1 should be affected... > > > On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote: > >> I think my previous guess was wrong. From what I can tell, when Kryo >> tries to copy the exception object, it does that by serializing and >> deserializing it. For subclasses of RuntimeException, it doesn't know how >> to do it so it delegates serialization to Java. However, it doesn't use a >> custom ObjectInputStream to override resolveClass() and provide classes >> from the user code classloader… such as happens in RocksDBStateBackend's >> use of InstantiationUtil.deserializeObject(). Instead, it uses >> ObjectInputStream$latestUserDefinedLoader() which is the >> Launcher$AppClassLoader which definitely doesn't have the user code in it. >> >> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being >> configured? >> >> Thanks, >> Shannon >> >> >> From: Shannon Carey <sca...@expedia.com> >> Date: Monday, March 6, 2017 at 7:09 PM >> To: "user@flink.apache.org" <user@flink.apache.org> >> Subject: Re: AWS exception serialization problem >> >> This happened when running Flink with bin/run-local.sh I notice that >> there only appears to be one Java process. The job manager and the task >> manager run in the same JVM, right? I notice, however, that there are two >> blob store folders on disk. Could the problem be caused by two different >> FlinkUserCodeClassLoader objects pointing to the two different JARs? >> >> >> From: Shannon Carey <sca...@expedia.com> >> Date: Monday, March 6, 2017 at 6:39 PM >> To: "user@flink.apache.org" <user@flink.apache.org> >> Subject: AWS exception serialization problem >> >> Has anyone encountered this or know what might be causing it? >> >> >> java.lang.RuntimeException: Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java >> deserialization. >> at >> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) >> at >> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) >> at >> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) >> ... 7 more >> Caused by: java.lang.ClassNotFoundException: >> com.amazonaws.services.s3.model.AmazonS3Exception >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) >> at >> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) >> at java.lang.Throwable.readObject(Throwable.java:914) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) >> at >> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) >> at java.lang.Throwable.readObject(Throwable.java:914) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) >> at >> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) >> at java.lang.Throwable.readObject(Throwable.java:914) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) >> at >> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45) >> ... 12 more >> >> >