Re-hi, I debuged a bit the test for the Event rowtime
I tested the testBoundNonPartitionedEventTimeWindowWithRange from SQLITCase class Although I would expect that once a watermark is triggered: 1) the on timer will be called to process the events that arrived so far and 2) the future events that arrive will be dropped. However, it seems that almost the entire input can arrive in the processElement function before the onTimer is triggered. Moreover, if you modify the input to add an un-ordered event (see dataset below where I added after watermark 14000 ...an event with watermark 1000...as far as I would expect this should be dropped. However, in different runs it can happen that it will be not dropped. Basically it can happen that the onTimer was never triggered and this event arrives and it is registered). Is this correct? Am I missing something? @Test def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = { val data = Seq( Left((1500L, (1L, 15, "Hello"))), Left((1600L, (1L, 16, "Hello"))), Left((1000L, (1L, 1, "Hello"))), Left((2000L, (2L, 2, "Hello"))), Right(1000L), Left((2000L, (2L, 2, "Hello"))), Left((2000L, (2L, 3, "Hello"))), Left((3000L, (3L, 3, "Hello"))), Right(2000L), Left((4000L, (4L, 4, "Hello"))), Right(3000L), Left((5000L, (5L, 5, "Hello"))), Right(5000L), Left((6000L, (6L, 6, "Hello"))), Left((6500L, (6L, 65, "Hello"))), Right(7000L), Left((9000L, (6L, 9, "Hello"))), Left((9500L, (6L, 18, "Hello"))), Left((9000L, (6L, 9, "Hello"))), Right(10000L), Left((10000L, (7L, 7, "Hello World"))), Left((11000L, (7L, 17, "Hello World"))), Left((11000L, (7L, 77, "Hello World"))), Right(12000L), Left((14000L, (7L, 18, "Hello World"))), Right(14000L), Left((15000L, (8L, 8, "Hello World"))), Left((1000L, (8L, 8, "Too late - Hello World"))), ///event is out of ordered and showed be droppped Right(17000L), Left((20000L, (20L, 20, "Hello World"))), Right(19000L)) -----Original Message----- From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Thursday, April 27, 2017 3:17 PM To: dev@flink.apache.org Subject: Re: question about rowtime processfunction - are watermarks needed? Hi Radu, event-time processing requires watermarks. Operators use watermarks to compute the current event-time. The ProcessFunctions for over range windows use the TimerServices to group elements by time. In case of event-time, the timers are triggered by the event-time of the operator which is derived from the received watermarks. In case of processing-time, the timers are triggered based on the wallclock time of the operator. So by using event-tim timers, we implicitly rely on the watermarks because the timers are triggered based on the received watermarks. Best, Fabian 2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I am looking at the implementation of RowTimeBoundedRangeOver (in the > context of Stream SQL). I see that the logic is that the progress > happens based on the timestamps of the rowevent - i.e., when an even > arrives we register to be processed based on it's timestamp (ctx.timerService. > registerEventTimeTimer(triggeringTs)) > > In the onTimer we remove (retract) data that has expired. However, we > do not consider watermarks nor some allowed latency for the events or > anything like this, which makes me ask: > Don't we need to work with watermarks when we deal with even time? And > keep the events within the allowed delayed/next watermark? Am I > missing something? Or maybe we do not consider at this point > allowedLateness for this version? > > Thanks > > Best regards, > >