Can you try adding the JAR to the class path of the executors directly, by setting the config "spark.executor.extraClassPath" in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment
I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar. TD On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk <anton.brazh...@genesys.com> wrote: > Just checked it with 1.0.2 > > Still same exception. > > > > *From:* Anton Brazhnyk [mailto:anton.brazh...@genesys.com] > *Sent:* Wednesday, August 27, 2014 6:46 PM > *To:* Tathagata Das > *Cc:* user@spark.apache.org > *Subject:* RE: [Streaming] Akka-based receiver with messages defined in > uploaded jar > > > > Sorry for the delay with answer – was on vacation. > > As I said I was using modified version of launcher from the example. > Modification is just about setting spark master URL in the code to not use > run-example script. > > The launcher itself was in the attached zip (attaching it once more) as > ActorWordCount object. > > > > *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com > <tathagata.das1...@gmail.com>] > *Sent:* Tuesday, August 05, 2014 11:32 PM > *To:* Anton Brazhnyk > *Cc:* user@spark.apache.org > *Subject:* Re: [Streaming] Akka-based receiver with messages defined in > uploaded jar > > > > How are you launching/submitting the program? Using spark-submit? Or some > other script (can you provide that)? > > > > TD > > > > On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk <anton.brazh...@genesys.com> > wrote: > > Went through it once again to leave the only modification in question. > Still same exception. > I hope sources as zip file (instead of github) still can be tolerated. :) > > Here is the stacktrace generated with this sources: > 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called > at time 1407289554800 > 14/08/05 18:45:54 ERROR Remoting: > org.apache.spark.examples.streaming.CustomMessage > java.lang.ClassNotFoundException: > org.apache.spark.examples.streaming.CustomMessage > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) > at > akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) > at > akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) > at scala.util.Try$.apply(Try.scala:161) > at > akka.serialization.Serialization.deserialize(Serialization.scala:98) > at > akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) > at > akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) > at > akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) > at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > -----Original Message----- > From: Tathagata Das [mailto:tathagata.das1...@gmail.com] > Sent: Tuesday, August 05, 2014 5:42 PM > To: Anton Brazhnyk > Cc: user@spark.apache.org > Subject: Re: [Streaming] Akka-based receiver with messages defined in > uploaded jar > > Can you show us the modified version. The reason could very well be what > you suggest, but I want to understand what conditions lead to this. > > TD > > On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk <anton.brazh...@genesys.com> > wrote: > > Greetings, > > > > > > > > I modified ActorWordCount example a little and it uses simple case > > class as the message for Streaming instead of the primitive string. > > > > I also modified launch code to not use run-example script, but set > > spark master in the code and attach the jar (setJars(…)) with all the > > classes including new case class. It runs fine in the local[*] mode > > but fails with ClassNotFoundException in standalone cluster (stacktrace > follows). > > > > > > > > I assume it’s the classloader problems and akka remoting just doesn’t > > know about the classes coming to the executor from attached jar. > > Am I right? > > > > > > > > I guess I could pass primitive values around and do my own > > (de)serialization but maybe there is a better way? > > > > What’s the correct way to build custom akka-based receiver with usage > > of non-primitive messages? > > > > > > > > > > > > Here is the log excerpt with stacktrace: > > > > 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator > > called at time 1407211181800 > > > > 14/08/04 20:59:41 ERROR Remoting: > > com.genesys.gpe.analytics.akka.messages.SubscribeAck > > > > java.lang.ClassNotFoundException: > > com.genesys.gpe.analytics.akka.messages.SubscribeAck > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > > > at > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > > > at java.lang.Class.forName0(Native Method) > > > > at java.lang.Class.forName(Class.java:270) > > > > at > > java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) > > > > at > > akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectI > > nputStream.scala:19) > > > > at > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610 > > ) > > > > at > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) > > > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 69) > > > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) > > > > at > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > > at > > akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:13 > > 6) > > > > at > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > > > at > > akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) > > > > at > > akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serializ > > ation.scala:104) > > > > at scala.util.Try$.apply(Try.scala:161) > > > > at > > akka.serialization.Serialization.deserialize(Serialization.scala:98) > > > > at > > akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) > > > > at > > akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.sca > > la:55) > > > > at > > akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) > > > > at > > akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) > > > > at > > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.sca > > la:764) > > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr > > actDispatcher.scala:386) > > > > at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > java:1339) > > > > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197 > > 9) > > > > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea > > d.java:107) > > > > > > > > > > WBR, > > > > Anton > > >