Hi Radu,

yes that might happen in a parallel setup and depends on the "speed" of the
parallel threads.
An operator does only increment its own event-time clock to the minimum of
the last watermark received from each input channel.
If one input channel is "slow", the event-time of an operator lacks behind
and "late" events of the other threads are correctly processed because the
operators event-time was not incremented yet.

So, event-time is not deterministic when it comes to which records are
dropped.
The watermark documentation might be helpful as well [1].

Cheers,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams

2017-04-27 22:09 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Re-hi,
>
> I debuged a bit the test for the Event rowtime
>
> I tested the testBoundNonPartitionedEventTimeWindowWithRange from
> SQLITCase class
>
> Although I would expect that once a watermark is triggered: 1) the on
> timer will be called to process the events that arrived so far and 2) the
> future events that arrive will be dropped. However, it seems that almost
> the entire input can arrive in the processElement function before the
> onTimer is triggered.
>
> Moreover, if you modify the input to add an un-ordered event (see dataset
> below where I added after watermark 14000 ...an event with watermark
> 1000...as far as I would expect this should be dropped. However, in
> different runs it can happen that it will be not dropped. Basically it can
> happen that the onTimer was never triggered and this event arrives and it
> is registered). Is this correct? Am I missing something?
>
>
>    @Test
>   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
>     val data = Seq(
>       Left((1500L, (1L, 15, "Hello"))),
>       Left((1600L, (1L, 16, "Hello"))),
>       Left((1000L, (1L, 1, "Hello"))),
>       Left((2000L, (2L, 2, "Hello"))),
>       Right(1000L),
>       Left((2000L, (2L, 2, "Hello"))),
>       Left((2000L, (2L, 3, "Hello"))),
>       Left((3000L, (3L, 3, "Hello"))),
>       Right(2000L),
>       Left((4000L, (4L, 4, "Hello"))),
>       Right(3000L),
>       Left((5000L, (5L, 5, "Hello"))),
>       Right(5000L),
>       Left((6000L, (6L, 6, "Hello"))),
>       Left((6500L, (6L, 65, "Hello"))),
>       Right(7000L),
>       Left((9000L, (6L, 9, "Hello"))),
>       Left((9500L, (6L, 18, "Hello"))),
>       Left((9000L, (6L, 9, "Hello"))),
>       Right(10000L),
>       Left((10000L, (7L, 7, "Hello World"))),
>       Left((11000L, (7L, 17, "Hello World"))),
>       Left((11000L, (7L, 77, "Hello World"))),
>       Right(12000L),
>       Left((14000L, (7L, 18, "Hello World"))),
>       Right(14000L),
>       Left((15000L, (8L, 8, "Hello World"))),
>       Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> of ordered and showed be droppped
>       Right(17000L),
>       Left((20000L, (20L, 20, "Hello World"))),
>       Right(19000L))
>
>
>
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, April 27, 2017 3:17 PM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks
> needed?
>
> Hi Radu,
>
> event-time processing requires watermarks. Operators use watermarks to
> compute the current event-time.
> The ProcessFunctions for over range windows use the TimerServices to group
> elements by time.
> In case of event-time, the timers are triggered by the event-time of the
> operator which is derived from the received watermarks.
> In case of processing-time, the timers are triggered based on the
> wallclock time of the operator.
>
> So by using event-tim timers, we implicitly rely on the watermarks because
> the timers are triggered based on the received watermarks.
>
> Best, Fabian
>
>
> 2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
> > Hi,
> >
> > I am looking at the implementation of  RowTimeBoundedRangeOver (in the
> > context of Stream SQL). I see that the logic is that the progress
> > happens based on the timestamps of the rowevent - i.e., when an even
> > arrives we register to be processed based on it's timestamp
> (ctx.timerService.
> > registerEventTimeTimer(triggeringTs))
> >
> > In the onTimer we remove (retract) data that has expired. However, we
> > do not consider watermarks nor some allowed latency for the events or
> > anything like this, which makes me ask:
> > Don't we need to work with watermarks when we deal with even time? And
> > keep the events within the allowed delayed/next watermark?  Am I
> > missing something? Or maybe we do not consider at this point
> > allowedLateness  for this version?
> >
> > Thanks
> >
> > Best regards,
> >
> >
>

Reply via email to