I am trying to run stateful Spark Streaming computations over (fake)
apache web server logs read from Kafka. The goal is to "sessionize"
the web traffic similar to this blog post:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
The only difference is that I want to "sessionize" each page the IP
hits, instead of the entire session. I was able to do this reading
from a file of fake web traffic using Spark in batch mode, but now I
want to do it in a streaming context.
Log files are read from Kafka and parsed into K/V pairs of
(String, (String, Long, Long)) or
(IP, (requestPage, time, time))
I then call "groupByKey()" on this K/V pair. In batch mode, this would
produce a:
(String, CollectionBuffer((String, Long, Long), ...) or
(IP, CollectionBuffer((requestPage, time, time), ...)
In a StreamingContext, it produces a:
(String, ArrayBuffer((String, Long, Long), ...) like so:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
However, as the next microbatch (DStream) arrives, this information is
discarded. Ultimately what I want is for that ArrayBuffer to fill up
over time as a given IP continues to interact and to run some
computations on its data to "sessionize" the page time. I believe the
operator to make that happen is "updateStateByKey." I'm having some
trouble with this operator (I'm new to both Spark & Scala); any help
is appreciated.
Thus far:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String,
Long, Long)])],
b: Option[(String, ArrayBuffer[(String,
Long, Long)])]
): Option[(String, ArrayBuffer[(String,
Long, Long)])] = {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]