it looks like when you configure sparkconfig to use the kryoserializer in
combination of using an ActorReceiver, bad things happen. I modified the
ActorWordCount example program from
val sparkConf = new SparkConf().setAppName("ActorWordCount")
to
val sparkConf = new SparkConf()
.setAppName("ActorWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
and I get the stack trace below. I figured it might be that Kryo doesn’t know
how to serialize/deserialize the actor so I added a registry. I also added a
default empty constructor to SampleActorReceiver just for kicks
class SerializationRegistry extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[SampleActorReceiver])
}
}
…
case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
def this() = this(“”)
...
}
...
val sparkConf = new SparkConf()
.setAppName("ActorWordCount")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.spark.examples.streaming.SerializationRegistry")
None of this worked, same stack trace. Any idea what’s going on? Is this a
known issue and is there a workaround?
14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating
[akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at
akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at
org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public
akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
arguments [class java.lang.Class, class
org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more