The watermark is just a user-provided indicator to spark that it's ok to drop internal state after some period of time. The watermark "interval" doesn't directly dictate whether hot rows are sent to a sink. Think of a hot row as data younger than the watermark. However, the watermark will prevent cold rows from being fully processed and sent to the sink (e.g., rows older than the watermark). There is no notion of requesting all data be queued and released only after the watermark has advanced past the time-based groups in that queue.
If you want to ensure only one row per time-based group is sent to the sink, you could get fancy with timeouts and flatMapGroupsWithState. Keep in mind, even in this scenario, the same row may be sent more than once if a micro-batch is reprocessed (this is why it is important for sinks to be idempotent, because it's really at-least-once effectively exactly-once). In general, I would assume you care about this fine-grained control because your sink is not idempotent. -Chris ________________________________ From: karthikjay <[email protected]> Sent: Thursday, March 29, 2018 5:10:09 PM To: [email protected] Subject: Writing record once after the watermarking interval in Spark Structured Streaming I have the following query: val ds = dataFrame .filter(! $"requri".endsWith(".m3u8")) .filter(! $"bserver".contains("trimmer")) .withWatermark("time", "120 seconds") .groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName")) .agg(sum("bytes")/1000000 as "byte_count") How do I implement a foreach writer so that its process method is triggered only once for every watermarking interval. i.e in the aforementioned example, I will get the following 10.00-10.01 Channel-1 100(bytes) 10.00-10.01 Channel-2 120(bytes) 10.01-10.02 Channel-1 110(bytes) ... -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: [email protected]
