df = inputStream.withWatermark("eventtime", "20
seconds").groupBy("sharedId", window("20 seconds", "10 seconds")

// ProcessingTime trigger with two-seconds micro-batch interval

   df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()


On Tue, 14 May 2019 at 20:40, Joe Ammann <j...@pyx.ch> wrote:

> Hi Anastasios
>
> On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> > Hi Joe,
> >
> > How often do you trigger your mini-batch? Maybe you can specify the
> trigger time explicitly to a low value or even better set it off.
> >
> > See:
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
>
> I tried different values for the trigger, and settled on 10 seconds. I can
> see in the logs that this actually works (it outputs a mini-batch summary
> in the log every 10 seconds).
>
> There in these log entries I also see that the watermark does not
> progress, if no new data is coming in. This is how I came to my suspsicion
> on how it works internally.
>
> I understand that it is quite uncommon to have such "slowly moving
> topics", but unfortunately in my use case I have them.
>
> > On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch <mailto:
> j...@pyx.ch>> wrote:
> >
> >     Hi all
> >
> >     I'm fairly new to Spark structured streaming and I'm only starting
> to develop an understanding for the watermark handling.
> >
> >     Our application reads data from a Kafka input topic and as one of
> the first steps, it has to group incoming messages. Those messages come in
> bulks, e.g. 5 messages which belong to the same "business event" (share a
> common key), with event timestamps differing in only a few millisecs. And
> then no messages for say 3 minutes. And after that another bulk of 3
> messages with very close event timestamps.
> >
> >     I have set a watermark of 20 seconds on my streaming query, and a
> groupBy on the shared common key, and a window of 20 seconds (10 seconds
> sliding). So something like
> >
> >         df = inputStream.withWatermark("eventtime", "20
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> >
> >     The output mode is set to append, since I intend to join this
> streams with other streams later in the application.
> >
> >     Naively, I would have expected to see any incoming bulk of messages
> as an aggregated message ~20 seconds after it's eventtime on the output
> stream. But my observations indicate that the "latest bulk of events"
> always stays queued inside the query, until a new bulk of events arrive and
> bump up the watermark. In my example above, this means that I see the first
> bulk of events only after 3 minutes, when the second bulk comes in.
> >
> >     This does indeed make some sense, and if I understand the
> documentation correctly the watermark is only ever updated upon arrival of
> new inputs. The "real time" does not play a role in the setting of
> watermarks.
> >
> >     But to me this means that any bulk of events is prohibited from
> being sent downstreams until a new bulk comes in. This is not what I
> intended.
> >
> >     Is my understanding more or less correct? And is there any way of
> bringing "the real time" into the calculation of the watermark (short of
> producing regular dummy messages which are then again filtered out).
> > --
> > -- Anastasios Zouzias
> > <mailto:a...@zurich.ibm.com>
>
>
> --
> CU, Joe
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to