Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <og...@plainvanillagames.com > wrote: > It looks like the problem is in the filter task - is there anything > special about filter()? > > I have removed the filter line from the loops just to see if things will > work and they do. > > Anyone has any ideas? > > Thanks! > Ognen > > > On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: > >> Hello, >> >> What is the general approach people take when trying to do analysis >> across multiple large files where the data to be extracted from a >> successive file depends on the data extracted from a previous file or set >> of files? >> >> For example: >> I have the following: a group of HDFS files each 20+GB in size. I need to >> extract event1 on day 1 from first file and extract event2 from all >> remaining files in a period of successive dates, then do a calculation on >> the two events. >> I then need to move on to day2, extract event1 (with certain properties), >> take all following days, extract event2 and run a calculation against >> previous day for all days in period. So on and so on. >> >> I have verified that the following (very naive approach doesn't work): >> >> def calcSimpleRetention(start:String,end:String,event1: >> String,event2:String):Map[String,List[Double]] = { >> val epd = new PipelineDate(end) >> val result = for { >> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd) >> val f1 = sc.textFile(dt1.toJsonHdfsFileName) >> val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") >> == event1).map(line => (line.split(",")(2).split(":") >> (1).replace("\"",""),0)).cache >> val c = e1.count.toDouble >> >> val intres = for { >> dt2 <- PipelineDate.getPeriod(dt1+1,epd) >> val f2 = sc.textFile(dt2.toJsonHdfsFileName) >> val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") >> == event2).map(line => (line.split(",")(2).split(":") >> (1).replace("\"",""),1)) >> val e1e2 = e1.union(e2) >> val r = e1e2.groupByKey().filter(e => e._2.length > 1 && >> e._2.filter(_==0).length>0).count.toDouble >> } yield (c/r) // get the retention rate >> } yield (dt1.toString->intres) >> Map(result:_*) >> } >> >> I am getting the following errors: >> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at >> CountActor.scala:33 >> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at >> CountActor.scala:33) with 140 output partitions (allowLocal=false) >> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at >> CountActor.scala:33) >> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() >> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() >> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at >> map at CountActor.scala:32), which has no missing parents >> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at >> CountActor.scala:33 >> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not >> serializable: java.io.NotSerializableException: >> com.github.ognenpv.pipeline.CountActor >> org.apache.spark.SparkException: Job aborted: Task not serializable: >> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$ >> apache$spark$scheduler$DAGScheduler$$abortStage$1. >> apply(DAGScheduler.scala:1028) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$ >> apache$spark$scheduler$DAGScheduler$$abortStage$1. >> apply(DAGScheduler.scala:1026) >> at scala.collection.mutable.ResizableArray$class.foreach( >> ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ >> scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) >> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ >> scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) >> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ >> scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) >> at org.apache.spark.scheduler.DAGScheduler.processEvent( >> DAGScheduler.scala:569) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$ >> $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( >> AbstractDispatcher.scala:386) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >> ForkJoinTask.java:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >> runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >> ForkJoinPool.java:1979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >> ForkJoinWorkerThread.java:107) >> >> I should mention that this code is fired off from an Akka actor (which is >> controlled by a Scalatra servlet). >> >> Any ideas, recommendations etc.? I am fairly new to Scala and M/R >> principles in general, it is fair to say that at this point I am still >> thinking from a point of view of an imperative programmer trying to fit a >> square peg through a round hole ;) >> Ognen >> > > -- > Some people, when confronted with a problem, think "I know, I'll use > regular expressions." Now they have two problems. > -- Jamie Zawinski > >