The watermark is just a user-provided indicator to spark that it's ok to drop 
internal state after some period of time. The watermark "interval" doesn't 
directly dictate whether hot rows are sent to a sink. Think of a hot row as 
data younger than the watermark. However, the watermark will prevent cold rows 
from being fully processed and sent to the sink (e.g., rows older than the 
watermark). There is no notion of requesting all data be queued and released 
only after the watermark has advanced past the time-based groups in that queue.


If you want to ensure only one row per time-based group is sent to the sink, 
you could get fancy with timeouts and flatMapGroupsWithState. Keep in mind, 
even in this scenario, the same row may be sent more than once if a micro-batch 
is reprocessed (this is why it is important for sinks to be idempotent, because 
it's really at-least-once effectively exactly-once).


In general, I would assume you care about this fine-grained control because 
your sink is not idempotent.


-Chris

________________________________
From: karthikjay <[email protected]>
Sent: Thursday, March 29, 2018 5:10:09 PM
To: [email protected]
Subject: Writing record once after the watermarking interval in Spark 
Structured Streaming

I have the following query:

    val ds = dataFrame
      .filter(! $"requri".endsWith(".m3u8"))
      .filter(! $"bserver".contains("trimmer"))
      .withWatermark("time", "120 seconds")
      .groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
      .agg(sum("bytes")/1000000 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]

Reply via email to