I have a Spark structured streaming based application that performs window(...) construct followed by aggregation.
This construct discards latecomer events that arrive past the watermark. I need to be able to detect these late events to handle them out-of-band. The application maintains a raw store of all received events and can re-aggregate a particular time interval for a particular device in a secondary batch mode, as long as it knows that this interval has to be re-aggregated, i.e. contains latecomer data that was discarded by structured streaming due to watermark. I am trying to come with a way to perform such a detection. One approach would perhaps be to insert an additional stage before window(...) -- a stage that would monitor all events received by the stream, look at their timestamps, and predict which events will be discarded by window(...) due to watermark. Such events can then be handled outside of Spark structured streaming. The stage can be based on Dataset.foreach, Dataset.filter or Dataset.map that will pass all events through, but also monitor the events and if a latecomer condition is detected, then issue a side channel notification that will cause data for the specified device and interval be re-aggregated later from raw event storage, out of stream. I have a couple of questions related to the feasibility of such a construct. Q1: Can data behind the window(...) be shared by multiple executors or nodes, or is it owned by one executor at any given time? If it is shared, it would appear that local monitoring of passing timestamps would be insufficient, since it lacks global context. Q2: To monitor the stream, the stage needs to maintain a context. The context can be checkpointed periodically in an external store, but I do not want to persist/readback the context for every microbatch (or, in the foreach case, for every individual event). I want to checkpoint the context infrequently, and maintain it across microbatches just in memory. Which brings a question... The handler function inside the stage (called by foreach, map, or filter) needs to refer to the context object, yet it is unclear how to make such a reference. I could attach a context to the stream via some global map object (translating stream->context), but handler functions for Dataset.foreach, Dataset.map, or Dataset.filter do not receive a stream handle, and thus have no key to use for translation back to context object. The association cannot be done via a TLS (per-thread) variable too, since Spark can internally create threads for stream processing and they won't inherit the parent TLS (and also may not even have the thread that started the stream as their parent thread). This appears to leave Java static variable as the only option for the context pointer, limiting the model to one active stream per executor. But is it guaranteed by Spark specification that different executors will run in different JVM instances? Thanks for advice.