Re-hi,

I debuged a bit the test for the Event rowtime

I tested the testBoundNonPartitionedEventTimeWindowWithRange from SQLITCase 
class

Although I would expect that once a watermark is triggered: 1) the on timer 
will be called to process the events that arrived so far and 2) the future 
events that arrive will be dropped. However, it seems that almost the entire 
input can arrive in the processElement function before the onTimer is 
triggered. 

Moreover, if you modify the input to add an un-ordered event (see dataset below 
where I added after watermark 14000 ...an event with watermark 1000...as far as 
I would expect this should be dropped. However, in different runs it can happen 
that it will be not dropped. Basically it can happen that the onTimer was never 
triggered and this event arrives and it is registered). Is this correct? Am I 
missing something?


   @Test
  def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
    val data = Seq(
      Left((1500L, (1L, 15, "Hello"))),
      Left((1600L, (1L, 16, "Hello"))),
      Left((1000L, (1L, 1, "Hello"))),
      Left((2000L, (2L, 2, "Hello"))),
      Right(1000L),
      Left((2000L, (2L, 2, "Hello"))),
      Left((2000L, (2L, 3, "Hello"))),
      Left((3000L, (3L, 3, "Hello"))),
      Right(2000L),
      Left((4000L, (4L, 4, "Hello"))),
      Right(3000L),
      Left((5000L, (5L, 5, "Hello"))),
      Right(5000L),
      Left((6000L, (6L, 6, "Hello"))),
      Left((6500L, (6L, 65, "Hello"))),
      Right(7000L),
      Left((9000L, (6L, 9, "Hello"))),
      Left((9500L, (6L, 18, "Hello"))),
      Left((9000L, (6L, 9, "Hello"))),
      Right(10000L),
      Left((10000L, (7L, 7, "Hello World"))),
      Left((11000L, (7L, 17, "Hello World"))),
      Left((11000L, (7L, 77, "Hello World"))),
      Right(12000L),
      Left((14000L, (7L, 18, "Hello World"))),
      Right(14000L),
      Left((15000L, (8L, 8, "Hello World"))),
      Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out of 
ordered and showed be droppped
      Right(17000L),
      Left((20000L, (20L, 20, "Hello World"))),
      Right(19000L))




-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Thursday, April 27, 2017 3:17 PM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

event-time processing requires watermarks. Operators use watermarks to compute 
the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group 
elements by time.
In case of event-time, the timers are triggered by the event-time of the 
operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock 
time of the operator.

So by using event-tim timers, we implicitly rely on the watermarks because the 
timers are triggered based on the received watermarks.

Best, Fabian


2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I am looking at the implementation of  RowTimeBoundedRangeOver (in the 
> context of Stream SQL). I see that the logic is that the progress 
> happens based on the timestamps of the rowevent - i.e., when an even 
> arrives we register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we 
> do not consider watermarks nor some allowed latency for the events or 
> anything like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And 
> keep the events within the allowed delayed/next watermark?  Am I 
> missing something? Or maybe we do not consider at this point 
> allowedLateness  for this version?
>
> Thanks
>
> Best regards,
>
>

Reply via email to