Regarding memory usage, you can configure Spark's memory fraction such that
persisted state RDDs fall out to disk and does crash the JVM. Also, the
state RDDs are periodically checkpointed HDFS for better recoverability.

But this seems like a pretty involved usecase that needs keeping around all
the records seen in the last 30 minutes (until flushed) within the state.
The updateFunction is run against all the key, even if there are no new
values for that key. So you can flush out values for keys that have not
been updated for 30 minutes. Also there are alternative versions of
updateStateByKey, that allows you do this more efficiently (e.g., process
whole partition at a time, and flush all old values together). Take a look
at those.

TD



On Fri, Apr 18, 2014 at 8:49 AM, xargsgrep <ahs...@gmail.com> wrote:

> Thanks, I played around with that example and had some followup questions.
>
> 1. The only way I was able to accumulate data per-key was to actually store
> all the data in the state, not just the timestamp (see example below).
> Otherwise I don't have access to data older than the batchDuration of the
> StreamingContext. This creates a concern about memory usage and what would
> happen if a node crashes, and how partitioning and replication work across
> spark nodes. Does it store the state on disk ever? Again, what I want to do
> is aggregate these sets of text lines by key and when a key has been
> "inactive" (ie, no new data has been received for that key) for a certain
> amount of time (eg 30 minutes), then finally save them somewhere and remove
> them from the state.
>
> 2. Is the update function called only for keys that are in the current
> batchDuration's stream or for all keys that exist? If it's the former, how
> can I check the timestamp of keys from an older batch that never appear in
> the stream again?
>
> Example:
> batchDuration = 10 minutes
> timestampThreshold = 30 minutes
>
> data:
> 09:00:00 user-one foo.com
> 09:09:00 user-one foo.com
> 09:15:00 user-two bar.com
> 09:18:00 user-one foo.com
> 09:25:00 user-two bar.com
>
> So given this set of data there are 3 batches and the state would look
> something like:
> batch1: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com") }
> batch2: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
> ("09:15:00 user-two bar.com") }
> batch3: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
> ("09:15:00 user-two bar.com", "09:25:00 user-two bar.com") }
>
> Now let's assume no more data comes in for either of the two keys above.
> Since the latest timestamp threshold is 30 minutes, "user-one:foo.com"
> should be flushed after 9:48 and "user-two:bar.com" should be flushed
> after
> 9:55.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410p4455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to