Thanks for the reply! Ok, if that's the case, I'd recommend a note to that affect in the docs at least.
Just to give some more context here, I'm working on a Clojure DSL for Spark called Flambo, which I plan to open source shortly. If I could I'd like to focus on the initial bug that I hit. Exception in thread "main" org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set final scala.collection.convert.Wrappers field scala.collection.convert.Wrappers$SeqWrapper.$outer to clojure.lang.PersistentVector Serialization trace: $outer (scala.collection.convert.Wrappers$SeqWrapper) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This happens immediately after all the tasks of a reduce stage complete successfully. Here is the function throwing the exception: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43 This is where I get lost. From googling around, it seems that scala is trying to wrap the result of my task, which contain clojure.lang.PersistentVector objects in a scala collection, but I don't know why it's doing that. I have a registered kryo serializer for clojure.lang.PersistentVector. based on this line is looks like it's trying to use the closure serializer, yet the expection thrown is from com.esotericsoftware.kryo.KryoException: https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from trying to deal with my clojure.lang.PeristentVector class? Where do I go from here? On Sun, May 4, 2014 at 12:50 PM, Reynold Xin <r...@databricks.com> wrote: > I added the config option to use the non-default serializer. However, at > the time, Kryo fails serializing pretty much any closures so that option > was never really used / recommended. > > Since then the Scala ecosystem has developed, and some other projects are > starting to use Kryo to serialize more Scala data structures, so I wouldn't > be surprised if there is a way to work around this now. However, I don't > have enough time to look into it at this point. If you do, please do post > your findings. Thanks. > > > > On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com> wrote: > > > apologies for the cross-list posts, but I've gotten zero response in the > > user list and I guess this list is probably more appropriate. > > > > According to the documentation, using the KryoSerializer for closures is > > supported. However, when I try to set `spark.closure.serializer` to > > `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. > > > > The first thing that happens it that is throws exceptions over and over > > that it cannot locate my registrator class, which is located in my > assembly > > jar like so: > > > > 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run > > spark.kryo.registrator > > java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > at java.security.AccessController.doPrivileged(Native Method) > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:270) > > at > > > > > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) > > at > > > > > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) > > at scala.Option.foreach(Option.scala:236) > > at > > > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) > > at > > > > > org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116) > > at > > > > > org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) > > at > > > > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) > > at > > > > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > > at > > > > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) > > at > > > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:724) > > > > Now, I would expect it not to be able to find this class since it hasn't > > yet fetched my assembly jar to the executors. Once it does fetch my jar, > > those expections stop. Next, all the executor task die with the following > > exception: > > > > java.nio.ReadOnlyBufferException > > at java.nio.ByteBuffer.array(ByteBuffer.java:961) > > at > > > > > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) > > at > > > > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) > > at > > > > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > > at > > > > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) > > at > > > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:724) > > > > AFAIK, I'm not doing anything out of the ordinary, just turning on kryo > and > > using the registrator mechanism to register a couple custom serializers. > > > > The reason I tried turning on kryo for closure in the first place is > > because of a different bug that I was hitting during fetching and > > deserializing of tasks from my executors, which I detailed here: > > > > > > > http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html > > > > Here's hoping some on this list can help me track down what's happening > as > > I didn't get a single reply on the user list. > > >