df = inputStream.withWatermark("eventtime", "20 seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
// ProcessingTime trigger with two-seconds micro-batch interval df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 seconds")) .start() On Tue, 14 May 2019 at 20:40, Joe Ammann <j...@pyx.ch> wrote: > Hi Anastasios > > On 5/14/19 4:15 PM, Anastasios Zouzias wrote: > > Hi Joe, > > > > How often do you trigger your mini-batch? Maybe you can specify the > trigger time explicitly to a low value or even better set it off. > > > > See: > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers > > I tried different values for the trigger, and settled on 10 seconds. I can > see in the logs that this actually works (it outputs a mini-batch summary > in the log every 10 seconds). > > There in these log entries I also see that the watermark does not > progress, if no new data is coming in. This is how I came to my suspsicion > on how it works internally. > > I understand that it is quite uncommon to have such "slowly moving > topics", but unfortunately in my use case I have them. > > > On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch <mailto: > j...@pyx.ch>> wrote: > > > > Hi all > > > > I'm fairly new to Spark structured streaming and I'm only starting > to develop an understanding for the watermark handling. > > > > Our application reads data from a Kafka input topic and as one of > the first steps, it has to group incoming messages. Those messages come in > bulks, e.g. 5 messages which belong to the same "business event" (share a > common key), with event timestamps differing in only a few millisecs. And > then no messages for say 3 minutes. And after that another bulk of 3 > messages with very close event timestamps. > > > > I have set a watermark of 20 seconds on my streaming query, and a > groupBy on the shared common key, and a window of 20 seconds (10 seconds > sliding). So something like > > > > df = inputStream.withWatermark("eventtime", "20 > seconds").groupBy("sharedId", window("20 seconds", "10 seconds") > > > > The output mode is set to append, since I intend to join this > streams with other streams later in the application. > > > > Naively, I would have expected to see any incoming bulk of messages > as an aggregated message ~20 seconds after it's eventtime on the output > stream. But my observations indicate that the "latest bulk of events" > always stays queued inside the query, until a new bulk of events arrive and > bump up the watermark. In my example above, this means that I see the first > bulk of events only after 3 minutes, when the second bulk comes in. > > > > This does indeed make some sense, and if I understand the > documentation correctly the watermark is only ever updated upon arrival of > new inputs. The "real time" does not play a role in the setting of > watermarks. > > > > But to me this means that any bulk of events is prohibited from > being sent downstreams until a new bulk comes in. This is not what I > intended. > > > > Is my understanding more or less correct? And is there any way of > bringing "the real time" into the calculation of the watermark (short of > producing regular dummy messages which are then again filtered out). > > -- > > -- Anastasios Zouzias > > <mailto:a...@zurich.ibm.com> > > > -- > CU, Joe > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >