I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to "sessionize" the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
The only difference is that I want to "sessionize" each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call "groupByKey()" on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to "sessionize" the page time. I believe the operator to make that happen is "updateStateByKey." I'm having some trouble with this operator (I'm new to both Spark & Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org