Hi Silvio, This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey
Here is my current code: def updateGroupByKey( newValues: Seq[(String, Long, Long)], currentValue: Option[Seq[(String, Long, Long)]] ): Option[Seq[(String, Long, Long)]] = { currentValue.map{ case (v) => v ++ newValues } } val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time Am I thinking about this wrong? Thank you On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > > Hi Pierce, > > You shouldn’t have to use groupByKey because updateStateByKey will get a > Seq of all the values for that key already. > > I used that for realtime sessionization as well. What I did was key my > incoming events, then send them to udpateStateByKey. The updateStateByKey > function then received a Seq of the events and the Option of the previous > state for that key. The sessionization code then did its thing to check if > the incoming events were part of the same session, based on a configured > timeout. If a session already was active (from the previous state) and it > hadn’t exceeded the timeout, it used that value. Otherwise it generated a > new session id. Then the return value for the updateStateByKey function > was a Tuple of session id and last timestamp. > > Then I joined the DStream with the session ids, which were both keyed off > the same id and continued my processing. Your requirements may be > different, but that’s what worked well for me. > > Another thing to consider is cleaning up old sessions by returning None in > the updateStateByKey function. This will help with long running apps and > minimize memory usage (and checkpoint size). > > I was using something similar to the method above on a live production > stream with very little CPU and memory footprint, running for weeks at a > time, processing up to 15M events per day with fluctuating traffic. > > Thanks, > Silvio > > > > On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> > wrote: > > >I am trying to run stateful Spark Streaming computations over (fake) > >apache web server logs read from Kafka. The goal is to "sessionize" > >the web traffic similar to this blog post: > > > http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat > >ion-with-spark-streaming-and-apache-hadoop/ > > > >The only difference is that I want to "sessionize" each page the IP > >hits, instead of the entire session. I was able to do this reading > >from a file of fake web traffic using Spark in batch mode, but now I > >want to do it in a streaming context. > > > >Log files are read from Kafka and parsed into K/V pairs of > > > >(String, (String, Long, Long)) or > > > >(IP, (requestPage, time, time)) > > > >I then call "groupByKey()" on this K/V pair. In batch mode, this would > >produce a: > > > >(String, CollectionBuffer((String, Long, Long), ...) or > > > >(IP, CollectionBuffer((requestPage, time, time), ...) > > > >In a StreamingContext, it produces a: > > > >(String, ArrayBuffer((String, Long, Long), ...) like so: > > > >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) > > > >However, as the next microbatch (DStream) arrives, this information is > >discarded. Ultimately what I want is for that ArrayBuffer to fill up > >over time as a given IP continues to interact and to run some > >computations on its data to "sessionize" the page time. I believe the > >operator to make that happen is "updateStateByKey." I'm having some > >trouble with this operator (I'm new to both Spark & Scala); any help > >is appreciated. > > > >Thus far: > > > > val grouped = > >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) > > > > > > def updateGroupByKey( > > a: Seq[(String, ArrayBuffer[(String, > >Long, Long)])], > > b: Option[(String, ArrayBuffer[(String, > >Long, Long)])] > > ): Option[(String, ArrayBuffer[(String, > >Long, Long)])] = { > > > > } > > > >--------------------------------------------------------------------- > >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >For additional commands, e-mail: user-h...@spark.apache.org > > >