Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been
trying to implement it this morning, but having some trouble. I would love
to see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
                          currentValue: Option[Seq[(String, Long, Long)]]
                          ): Option[Seq[(String, Long, Long)]] = {

      currentValue.map{ case (v) => v ++ newValues
      }
    }

    val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with
anything. The issue is probably that currentValue is not actually an
Option[Seq[triple]] but rather an Option[triple]. However if I change it to
an Option[triple] then I have to also return an Option[triple] for
updateStateByKey to compile, but I want that return value to be an
Option[Seq[triple]] because ultimately i want the data to look like
(IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:
>
> Hi Pierce,
>
> You shouldn’t have to use groupByKey because updateStateByKey will get a
> Seq of all the values for that key already.
>
> I used that for realtime sessionization as well. What I did was key my
> incoming events, then send them to udpateStateByKey. The updateStateByKey
> function then received a Seq of the events and the Option of the previous
> state for that key. The sessionization code then did its thing to check if
> the incoming events were part of the same session, based on a configured
> timeout. If a session already was active (from the previous state) and it
> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
> new session id. Then the return value for the updateStateByKey function
> was a Tuple of session id and last timestamp.
>
> Then I joined the DStream with the session ids, which were both keyed off
> the same id and continued my processing. Your requirements may be
> different, but that’s what worked well for me.
>
> Another thing to consider is cleaning up old sessions by returning None in
> the updateStateByKey function. This will help with long running apps and
> minimize memory usage (and checkpoint size).
>
> I was using something similar to the method above on a live production
> stream with very little CPU and memory footprint, running for weeks at a
> time, processing up to 15M events per day with fluctuating traffic.
>
> Thanks,
> Silvio
>
>
>
> On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com>
> wrote:
>
> >I am trying to run stateful Spark Streaming computations over (fake)
> >apache web server logs read from Kafka. The goal is to "sessionize"
> >the web traffic similar to this blog post:
> >
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
> >ion-with-spark-streaming-and-apache-hadoop/
> >
> >The only difference is that I want to "sessionize" each page the IP
> >hits, instead of the entire session. I was able to do this reading
> >from a file of fake web traffic using Spark in batch mode, but now I
> >want to do it in a streaming context.
> >
> >Log files are read from Kafka and parsed into K/V pairs of
> >
> >(String, (String, Long, Long)) or
> >
> >(IP, (requestPage, time, time))
> >
> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
> >produce a:
> >
> >(String, CollectionBuffer((String, Long, Long), ...) or
> >
> >(IP, CollectionBuffer((requestPage, time, time), ...)
> >
> >In a StreamingContext, it produces a:
> >
> >(String, ArrayBuffer((String, Long, Long), ...) like so:
> >
> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
> >
> >However, as the next microbatch (DStream) arrives, this information is
> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
> >over time as a given IP continues to interact and to run some
> >computations on its data to "sessionize" the page time. I believe the
> >operator to make that happen is "updateStateByKey." I'm having some
> >trouble with this operator (I'm new to both Spark & Scala); any help
> >is appreciated.
> >
> >Thus far:
> >
> >    val grouped =
> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
> >
> >
> >        def updateGroupByKey(
> >                              a: Seq[(String, ArrayBuffer[(String,
> >Long, Long)])],
> >                              b: Option[(String, ArrayBuffer[(String,
> >Long, Long)])]
> >                              ): Option[(String, ArrayBuffer[(String,
> >Long, Long)])] = {
> >
> >      }
> >
> >---------------------------------------------------------------------
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>

Reply via email to