Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :)

Ognen

On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <og...@plainvanillagames.com <mailto:og...@plainvanillagames.com>> wrote:

    It looks like the problem is in the filter task - is there
    anything special about filter()?

    I have removed the filter line from the loops just to see if
    things will work and they do.

    Anyone has any ideas?

    Thanks!
    Ognen


    On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

        Hello,

        What is the general approach people take when trying to do
        analysis across multiple large files where the data to be
        extracted from a successive file depends on the data extracted
        from a previous file or set of files?

        For example:
        I have the following: a group of HDFS files each 20+GB in
        size. I need to extract event1 on day 1 from first file and
        extract event2 from all remaining files in a period of
        successive dates, then do a calculation on the two events.
        I then need to move on to day2, extract event1 (with certain
        properties), take all following days, extract event2 and run a
        calculation against previous day for all days in period. So on
        and so on.

        I have verified that the following (very naive approach
        doesn't work):

        def
        
calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
        = {
            val epd = new PipelineDate(end)
            val result = for {
              dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
              val f1 = sc.textFile(dt1.toJsonHdfsFileName)
              val e1 =
        f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
        event1).map(line =>
        (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
              val c = e1.count.toDouble

              val intres = for {
                dt2 <- PipelineDate.getPeriod(dt1+1,epd)
                val f2 = sc.textFile(dt2.toJsonHdfsFileName)
                val e2 =
        f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
        event2).map(line =>
        (line.split(",")(2).split(":")(1).replace("\"",""),1))
                val e1e2 = e1.union(e2)
                val r = e1e2.groupByKey().filter(e => e._2.length > 1
        && e._2.filter(_==0).length>0).count.toDouble
              } yield (c/r) // get the retention rate
            } yield (dt1.toString->intres)
            Map(result:_*)
          }

        I am getting the following errors:
        14/03/07 03:22:25 INFO SparkContext: Starting job: count at
        CountActor.scala:33
        14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
        CountActor.scala:33) with 140 output partitions (allowLocal=false)
        14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0
        (count at CountActor.scala:33)
        14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage:
        List()
        14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
        14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0
        (MappedRDD[3] at map at CountActor.scala:32), which has no
        missing parents
        14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
        CountActor.scala:33
        14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task
        not serializable: java.io
        <http://java.io>.NotSerializableException:
        com.github.ognenpv.pipeline.CountActor
        org.apache.spark.SparkException: Job aborted: Task not
        serializable: java.io
        <http://java.io>.NotSerializableException:
        com.github.ognenpv.pipeline.CountActor
            at
        
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
            at
        
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
            at
        
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at
        scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.org
        
<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
            at org.apache.spark.scheduler.DAGScheduler.org
        
<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
            at org.apache.spark.scheduler.DAGScheduler.org
        
<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
            at
        
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
            at
        
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
            at akka.actor.ActorCell.invoke(ActorCell.scala:456)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
            at akka.dispatch.Mailbox.run(Mailbox.scala:219)
            at
        
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
            at
        scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
            at
        
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
            at
        scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
            at
        
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

        I should mention that this code is fired off from an Akka
        actor (which is controlled by a Scalatra servlet).

        Any ideas, recommendations etc.? I am fairly new to Scala and
        M/R principles in general, it is fair to say that at this
        point I am still thinking from a point of view of an
        imperative programmer trying to fit a square peg through a
        round hole ;)
        Ognen


-- Some people, when confronted with a problem, think "I know, I'll
    use regular expressions." Now they have two problems.
    -- Jamie Zawinski



Reply via email to