Hi Suket, Anastasios

Many thanks for your time and your suggestions!

I tried again with various settings for the watermarks and the trigger time

- watermark 20sec, trigger 2sec
- watermark 10sec, trigger 1sec
- watermark 20sec, trigger 0sec

I also tried continuous processing mode, but since I want to do aggregations, 
this did not work at all.

With all the combinations above, my observations (using 'append' output mode) 
are the same: the latest group/aggregation of events is not output/published to 
the target Kafka topic, until another event arrives with a later event 
timestamp that moves the watermark ahead far enough, so that this waiting group 
of events can safely be published. Neither the processing time (wall clock 
time) nor the trigger time play any role in that decision. Only a new event can 
move the watermark ahead, and cause the publishing/output. As long as no new 
events arrive, new mini-batches will be triggered very frequently, but will not 
produce new results.

In the meantime, I read a lot about the semantics of such event time handling 
in various streaming systems. And I think Spark's behaviour that I'm observing 
makes actually sense and is fully in line with the documentation. It just does 
not match my naive intuition.

Using 'update' mode instead of 'append' solves this and aggregates are 
immediately published (may be amended later). But 'update' mode is not very 
useful for my application, because I need to join these aggregates with other 
streams. Using 'update' would force me to persist those intermediate 
aggregation results. But I'm getting the impression this is what I will have to 
do.

On 5/14/19 6:49 PM, Suket Arora wrote:
>   df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
> // ProcessingTime trigger with two-seconds micro-batch interval
> 
> |df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 
> seconds")) .start()|
> 
> 
> On Tue, 14 May 2019 at 20:40, Joe Ammann <j...@pyx.ch <mailto:j...@pyx.ch>> 
> wrote:
> 
>     Hi Anastasios
> 
>     On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
>     > Hi Joe,
>     >
>     > How often do you trigger your mini-batch? Maybe you can specify the 
> trigger time explicitly to a low value or even better set it off.
>     >
>     > See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
> 
>     I tried different values for the trigger, and settled on 10 seconds. I 
> can see in the logs that this actually works (it outputs a mini-batch summary 
> in the log every 10 seconds).
> 
>     There in these log entries I also see that the watermark does not 
> progress, if no new data is coming in. This is how I came to my suspsicion on 
> how it works internally.
> 
>     I understand that it is quite uncommon to have such "slowly moving 
> topics", but unfortunately in my use case I have them.
> 
>     > On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch 
> <mailto:j...@pyx.ch> <mailto:j...@pyx.ch <mailto:j...@pyx.ch>>> wrote:
>     >
>     >     Hi all
>     >
>     >     I'm fairly new to Spark structured streaming and I'm only starting 
> to develop an understanding for the watermark handling.
>     >
>     >     Our application reads data from a Kafka input topic and as one of 
> the first steps, it has to group incoming messages. Those messages come in 
> bulks, e.g. 5 messages which belong to the same "business event" (share a 
> common key), with event timestamps differing in only a few millisecs. And 
> then no messages for say 3 minutes. And after that another bulk of 3 messages 
> with very close event timestamps.
>     >
>     >     I have set a watermark of 20 seconds on my streaming query, and a 
> groupBy on the shared common key, and a window of 20 seconds (10 seconds 
> sliding). So something like
>     >
>     >         df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
>     >
>     >     The output mode is set to append, since I intend to join this 
> streams with other streams later in the application.
>     >
>     >     Naively, I would have expected to see any incoming bulk of messages 
> as an aggregated message ~20 seconds after it's eventtime on the output 
> stream. But my observations indicate that the "latest bulk of events" always 
> stays queued inside the query, until a new bulk of events arrive and bump up 
> the watermark. In my example above, this means that I see the first bulk of 
> events only after 3 minutes, when the second bulk comes in.
>     >
>     >     This does indeed make some sense, and if I understand the 
> documentation correctly the watermark is only ever updated upon arrival of 
> new inputs. The "real time" does not play a role in the setting of watermarks.
>     >
>     >     But to me this means that any bulk of events is prohibited from 
> being sent downstreams until a new bulk comes in. This is not what I intended.
>     >
>     >     Is my understanding more or less correct? And is there any way of 
> bringing "the real time" into the calculation of the watermark (short of 
> producing regular dummy messages which are then again filtered out).
>     > --
>     > -- Anastasios Zouzias
>     > <mailto:a...@zurich.ibm.com <mailto:a...@zurich.ibm.com>>

-- 
CU, Joe

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to