It's in the data serialization section of the tuning guide, here: http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization
On Mon, Feb 24, 2014 at 7:44 PM, Soumya Simanta <[email protected]>wrote: > Thanks Andrew. I was expecting this to be the issue. > Are there any pointers about how to change the serialization to Kryo ? > > > > > On Mon, Feb 24, 2014 at 10:17 PM, Andrew Ash <[email protected]> wrote: > >> This is because Joda's DateTimeFormatter is not serializable (doesn't >> implement the empty Serializable interface) >> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html >> >> One ugly thing I've done before is to instantiate a new DateTimeFormatter >> in every line, so like this: >> >> myRDD.filter(x => >> DateTimeFormat.forPattern("YYYY-mm-dd").parseString(x.getCreatedAt).isAfter(start) >> ).count >> >> It's very inefficient but it gets things closer to working. >> >> Another thing to try is to switch to using Kryo serialization instead of >> the default Java serialization, which I think did handle DTF formatting >> correctly. Back in 0.7.x days though, there was an issue where some of the >> Joda libraries wouldn't correctly serialize with Kryo, but I think that's >> since been fixed: >> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU >> >> HTH, >> Andrew >> >> >> On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta <[email protected] >> > wrote: >> >>> I want to filter a RDD by comparing dates. >>> >>> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count >>> >>> >>> I'm using the JodaTime library but I get an exception about a Jodatime >>> class not serializable. >>> >>> Is there a way to configure this or an easier alternative for this >>> problem. >>> >>> >>> org.apache.spark.SparkException: Job aborted: Task not serializable: >>> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) >>> >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) >>> >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >> >> >
