No.

It was a logical error.

val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache should have mapped to ,0, not ,1

I have had the most awful time figuring out these "looped" things. It seems like it is next to impossible to run a .filter() operation in a for loop, it seems to work if you yield .filter()

Still don't understand why that is...

Ognen

On 3/7/14, 1:05 PM, Mayur Rustagi wrote:
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <[email protected] <mailto:[email protected]>> wrote:

    Strike that. Figured it out. Don't you just hate it when you fire
    off an email and you figure it out as it is being sent? ;)
    Ognen


    On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

        What is wrong with this code?

        A condensed set of this code works in the spark-shell.

        It does not work when deployed via a jar.

        def
        
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
        = {
            val spd = new PipelineDate(start)
            val epd = new PipelineDate(end)
            // filter for event1 events and return RDDs that are maps
        of user_ids and 0
            val f = sc.textFile(spd.toJsonHdfsFileName)
            val ev1rdd =
        f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
        event1).map(line =>
        (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
            val ev1c = ev1rdd.count.toDouble

            // do the same as above for event2 events, only substitute
        0s with 1s
            val ev2rdds = for {
               dt <- PipelineDate.getPeriod(spd+1,epd)
               val f1 = sc.textFile(dt.toJsonHdfsFileName)
            } yield
        (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
        event2).map(line =>
        (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

            // cache all event1 and event2 RDDs
            ev2rdds.foreach(_.cache)
            val cts = for {
              ev2 <- ev2rdds
            } yield ev2.count

            val retent = for {
              ev2rdd <- ev2rdds
              val ret = ev1rdd.union(ev2rdd).groupByKey()
            } yield ret.filter(e => e._2.length > 1 &&
        e._2.filter(_==0).length>0)

            val rcts = retent.map(_.count)
        
println("----------------------------------------------------------------------")

            println(s"${rcts}")
            println(s"${cts}")

            for {
              c <- rcts
            } yield(ev1c/c.toDouble)
            //Map(result:_*)
          }

        This is what this code prints:
        List(0, 0)
        List(785912, 825254)
        List(Infinity, Infinity)

        My question is: it does not appear that the
        union().groupBy().filter(....) segment is working (the
        List(0,0) output). The app is not failing, it finishes just fine.

        Any ideas?
        Ognen


-- Some people, when confronted with a problem, think "I know, I'll
    use regular expressions." Now they have two problems.
    -- Jamie Zawinski



--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski

Reply via email to