Hi Fabrizio,

Can someone explain me why do I get SparkConf not serializable error ?
>

First, SparkConf is not serializable and that's what the exception tells
you. Why you stuck in this situation ? Well, that's must because some of
your classes must require a SparkConf class. In your case, that's because
you define your Custom and CustomOrdering inside the test class. Spark
serializes all dependencies needed by your closure and here Custom and
CustomOrdering requires RDDSuite which in turn requires SharedSparkContext.
So the reference chain would be Custom, CustomOrdering <- RDDSuite <-
SharedSparkContext <- SparkContext <- SparkConf. Now you see, your code do
ask Spark to serialize SparkConf and it is not serializable in current
implementation. The solution is very simple, move your Custom and
CustomOrdering out of RDDSuite class, that's it.

Thanks
-Shengzhe

On Sat, Feb 22, 2014 at 5:35 PM, Fabrizio Milo aka misto <
mistob...@gmail.com> wrote:

> Hello Spark Developers,
>
> While trying to use the takeOrdered method of RDD in the following way:
>
>   object AceScoreOrdering extends Ordering[Record] {
>       def compare(a:Record, b:Record) = a.score.ace_score compare
> b.score.ace_score
>     }
>
>     val collected = dataset.takeOrdered(topN)(AceScoreOrdering)
>
> I got this error:
>
> 14/02/22 09:11:53 ERROR actor.OneForOneStrategy:
> scala.collection.immutable.Nil$ cannot be cast to
> org.apache.spark.util.BoundedPriorityQueue
> java.lang.ClassCastException: scala.collection.immutable.Nil$ cannot
> be cast to org.apache.spark.util.BoundedPriorityQueue
> at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:941)
> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:727)
> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:724)
> at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:843)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:598)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> The error happens in this piece of code ( this is from today's TIP on
> github) :
>
>   def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
>     mapPartitions { items =>
>       val queue = new BoundedPriorityQueue[T](num)
>       queue ++= items
>       Iterator.single(queue)
>     }.reduce { (queue1, queue2) =>
>       queue1 ++= queue2
>       queue1
>     }.toArray.sorted(ord.reverse)
>   }
>
> I am not an expert of scala but looks like in case one of the
> partition returns a completely empty
> collection ( scala.collection.immutable.Nil ? ) then the system is not
> able to reduce it to a queue.
>
> Now the real question is that I was trying to emulate this behavior
> with a simple test inside RDDSuite.scala:
>
>
> test("takeOrdered with nil partition") {
>     case class Custom(value: Int) extends Serializable
>     object CustomOrdering extends Ordering[Custom] {
>       def compare(a:Custom, b:Custom) = a.value compare b.value
>     }
>     val nums = Array(Custom(1), Custom(2))
>      val rdd = sc.makeRDD(nums, 2)
>     val sortedTopK = rdd.takeOrdered(3)(CustomOrdering)
>     assert(sortedTopK.size === 2)
>     assert(sortedTopK === Array(Custom(1), Custom(2)))
>     assert(sortedTopK === nums.sorted(CustomOrdering).take(2))
>   }
>
>
> But out of no where I get this error:
>
> Job aborted: Task not serializable: java.io.NotSerializableException:
> org.apache.spark.SparkConf
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.apache.spark.SparkConf
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1015)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1015)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:778)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:721)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:551)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Can someone explain me why do I get SparkConf not serializable error ?
> out of where ?
>
> Thank you for you time
>
> Fabrizio
> --
> LinkedIn: http://linkedin.com/in/fmilo
> Twitter: @fabmilo
> Github: http://github.com/Mistobaan/
> -----------------------
> Simplicity, consistency, and repetition - that's how you get through.
> (Jack Welch)
> Perfection must be reached by degrees; she requires the slow hand of
> time (Voltaire)
> The best way to predict the future is to invent it (Alan Kay)
>

Reply via email to