Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs.
TD On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <a...@opsclarity.com> wrote: > bump. any ideas? > > On Jul 24, 2014, at 3:09 AM, Alan Ngai <a...@opsclarity.com> wrote: > > it looks like when you configure sparkconfig to use the kryoserializer in > combination of using an ActorReceiver, bad things happen. I modified the > ActorWordCount example program from > > val sparkConf = new SparkConf().setAppName("ActorWordCount") > > to > > val sparkConf = new SparkConf() > .setAppName("ActorWordCount") > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer”) > > and I get the stack trace below. I figured it might be that Kryo doesn’t > know how to serialize/deserialize the actor so I added a registry. I also > added a default empty constructor to SampleActorReceiver just for kicks > > class SerializationRegistry extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[SampleActorReceiver]) > } > } > > … > > case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String) > extends Actor with ActorHelper { > def this() = this(“”) > ... > } > > ... > val sparkConf = new SparkConf() > .setAppName("ActorWordCount") > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryo.registrator", > "org.apache.spark.examples.streaming.SerializationRegistry") > > > None of this worked, same stack trace. Any idea what’s going on? Is this > a known issue and is there a workaround? > > 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while > creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher > [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] > akka.actor.ActorInitializationException: exception during creation > at akka.actor.ActorInitializationException$.apply(Actor.scala:218) > at akka.actor.ActorCell.create(ActorCell.scala:578) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) > at akka.dispatch.Mailbox.run(Mailbox.scala:218) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.ConfigurationException: configuration problem while > creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher > [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] > at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723) > at > akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296) > at akka.actor.dungeon.Children$class.makeChild(Children.scala:191) > at akka.actor.dungeon.Children$class.actorOf(Children.scala:38) > at akka.actor.ActorCell.actorOf(ActorCell.scala:338) > at > org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152) > at > org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) > at > org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145) > at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401) > at akka.actor.Props.newActor(Props.scala:339) > at akka.actor.ActorCell.newActor(ActorCell.scala:534) > at akka.actor.ActorCell.create(ActorCell.scala:560) > ... 9 more > Caused by: java.lang.IllegalArgumentException: constructor public > akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with > arguments [class java.lang.Class, class > org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] > at akka.util.Reflect$.instantiate(Reflect.scala:69) > at akka.actor.Props.cachedActorClass(Props.scala:203) > at akka.actor.Props.actorClass(Props.scala:327) > at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124) > at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718) > ... 20 more > Caused by: java.lang.IllegalArgumentException: wrong number of arguments > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:525) > at akka.util.Reflect$.instantiate(Reflect.scala:65) > ... 24 more > > >