Thanks for the reply!

Ok, if that's the case, I'd recommend a note to that affect in the docs at
least.

Just to give some more context here, I'm working on a Clojure DSL for Spark
called Flambo, which I plan to open source shortly. If I could I'd like to
focus on the initial bug that I hit.

Exception in thread "main" org.apache.spark.SparkException: Job aborted:
Exception while deserializing and fetching task:
com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Can not set final
scala.collection.convert.Wrappers field
scala.collection.convert.Wrappers$SeqWrapper.$outer to
clojure.lang.PersistentVector
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This happens immediately after all the tasks of a reduce stage complete
successfully. Here is the function throwing the exception:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43

This is where I get lost. From googling around, it seems that scala is
trying to wrap the result of my task, which contain
clojure.lang.PersistentVector objects in a scala collection, but I don't
know why it's doing that. I have a registered kryo serializer for
clojure.lang.PersistentVector.

based on this line is looks like it's trying to use the closure serializer,
yet the expection thrown is from com.esotericsoftware.kryo.KryoException:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39

Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
trying to deal with my clojure.lang.PeristentVector class?

Where do I go from here?


On Sun, May 4, 2014 at 12:50 PM, Reynold Xin <r...@databricks.com> wrote:

> I added the config option to use the non-default serializer. However, at
> the time, Kryo fails serializing pretty much any closures so that option
> was never really used / recommended.
>
> Since then the Scala ecosystem has developed, and some other projects are
> starting to use Kryo to serialize more Scala data structures, so I wouldn't
> be surprised if there is a way to work around this now. However, I don't
> have enough time to look into it at this point. If you do, please do post
> your findings. Thanks.
>
>
>
> On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com> wrote:
>
> > apologies for the cross-list posts, but I've gotten zero response in the
> > user list and I guess this list is probably more appropriate.
> >
> > According to the documentation, using the KryoSerializer for closures is
> > supported. However, when I try to set `spark.closure.serializer` to
> > `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.
> >
> > The first thing that happens it that is throws exceptions over and over
> > that it cannot locate my registrator class, which is located in my
> assembly
> > jar like so:
> >
> > 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
> > spark.kryo.registrator
> > java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:270)
> > at
> >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
> > at
> >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
> > at scala.Option.foreach(Option.scala:236)
> > at
> >
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
> > at
> >
> >
> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116)
> > at
> >
> >
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
> > at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
> > at
> >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > at
> >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:415)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:724)
> >
> > Now, I would expect it not to be able to find this class since it hasn't
> > yet fetched my assembly jar to the executors. Once it does fetch my jar,
> > those expections stop. Next, all the executor task die with the following
> > exception:
> >
> > java.nio.ReadOnlyBufferException
> > at java.nio.ByteBuffer.array(ByteBuffer.java:961)
> > at
> >
> >
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
> > at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
> > at
> >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > at
> >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:415)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:724)
> >
> > AFAIK, I'm not doing anything out of the ordinary, just turning on kryo
> and
> > using the registrator mechanism to register a couple custom serializers.
> >
> > The reason I tried turning on kryo for closure in the first place is
> > because of a different bug that I was hitting during fetching and
> > deserializing of tasks from my executors, which I detailed here:
> >
> >
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html
> >
> > Here's hoping some on this list can help me track down what's happening
> as
> > I didn't get a single reply on the user list.
> >
>

Reply via email to