I have a Spark structured streaming based application that performs
window(...) construct followed by aggregation.

This construct discards latecomer events that arrive past the watermark. I
need to be able to detect these late events to handle them out-of-band.
The application maintains a raw store of all received events and can
re-aggregate a particular time interval for a particular device in a
secondary batch mode, as long as it knows that this interval has to be
re-aggregated, i.e. contains latecomer data that was discarded by
structured streaming due to watermark.

I am trying to come with a way to perform such a detection.

One approach would perhaps be to insert an additional stage before
window(...) -- a stage that would monitor all events received by the
stream, look at their timestamps, and predict which events will be
discarded by window(...) due to watermark. Such events can then be handled
outside of Spark structured streaming. The stage can be based on
Dataset.foreach, Dataset.filter or Dataset.map that will pass all events
through, but also monitor the events and if a latecomer condition is
detected, then issue a side channel notification that will cause data for
the specified device and interval be re-aggregated later from raw event
storage, out of stream.

I have a couple of questions related to the feasibility of such a construct.

Q1:

Can data behind the window(...) be shared by multiple executors or nodes,
or is it owned by one executor at any given time? If it is shared, it would
appear that local monitoring of passing timestamps would be insufficient,
since it lacks global context.

Q2:

To monitor the stream, the stage needs to maintain a context. The context
can be checkpointed periodically in an external store, but I do not want to
persist/readback the context for every microbatch (or, in the foreach case,
for every individual event). I want to checkpoint the context infrequently,
and maintain it across microbatches just in memory.

Which brings a question... The handler function inside the stage (called by
foreach, map, or filter) needs to refer to the context object, yet it is
unclear how to make such a reference.

I could attach a context to the stream via some global map object
(translating stream->context), but handler functions for Dataset.foreach,
Dataset.map, or Dataset.filter do not receive a stream handle, and thus
have no key to use for translation back to context object.

The association cannot be done via a TLS (per-thread) variable too, since
Spark can internally create threads for stream processing and they won't
inherit the parent TLS (and also may not even have the thread that started
the stream as their parent thread).

This appears to leave Java static variable as the only option for the
context pointer, limiting the model to one active stream per executor. But
is it guaranteed by Spark specification that different executors will run
in different JVM instances?

Thanks for advice.

Reply via email to