Hi, thanks a lot for investigating this problems and the results you shared. This looks like a bug to me. I'm CCing Aljoscha who knows the internals of the DataStream API very well.
Which Flink version are you using? Would you mind creating a JIRA issue [1] with all the info you provided so far? Thank you, Fabian [1] https://issues.apache.org/jira/projects/FLINK/summary 2017-12-08 11:27 GMT+01:00 romain.jln <romain....@gmail.com>: > Hi, > > The stack trace is usually something like : > > Exception in thread "Thread-49" java.lang.NoClassDefFoundError: > com/microsoft/azure/eventhubs/amqp/AmqpErrorCode > at > com.microsoft.azure.eventhubs.ExceptionUtil.toException( > ExceptionUtil.java:30) > at > com.microsoft.azure.eventhubs.MessageSender.onClose( > MessageSender.java:376) > at > com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose( > BaseLinkHandler.java:76) > at > com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose( > BaseLinkHandler.java:47) > at org.apache.qpid.proton.engine.BaseHandler.handle( > BaseHandler.java:176) > at > org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) > at > org.apache.qpid.proton.reactor.impl.ReactorImpl. > dispatch(ReactorImpl.java:309) > at > org.apache.qpid.proton.reactor.impl.ReactorImpl. > process(ReactorImpl.java:276) > at > com.microsoft.azure.eventhubs.MessagingFactory$RunReactor. > run(MessagingFactory.java:404) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > com.microsoft.azure.eventhubs.amqp.AmqpErrorCode > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 10 more > > (those messages are appearing randomly in the stdout of the task managers) > > For a little bit of context about to this stack trace, it is related to a > custom implementation of a Flink Source that connects to an Azure Eventhub. > When starting an Eventhub client, the Eventhub library creates a Reactor > thread for managing the AMQP messages (proton library). This thread is > created in the Open function of the custom source. > > I checked the fat jar that I am uploading to Flink using the web API and > the > given class is correctly located at the given path. > > It is not always the same class that is missing. It can also be > com.microsoft.azure.eventhubs.ExceptionUtil, > com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the > same package. All of those classes are correctly located in the fat jar. > > I kept on investigating the issue and here are the first results I got : > > Using Thread.currentThread().getContextClassLoader(), I can see that, when > manually cancelling the job (via the web API), the class of the ClassLoader > is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader > (which can explain some of the ClassNotFoundException) > > However, when Flink automatically cancels the source (because of an error > during the execution of the job), it correctly uses a > FlinkUserCodeClassLoader as expected. > > When checking the ClassLoader of the thread during the call to the Open > method of the source, it also correctly uses a FlinkUserCodeClassLoader. > > But I still keep on getting some ClassNotFoundException from time to time > for no apparent reason to me. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >