Hi Radu, yes that might happen in a parallel setup and depends on the "speed" of the parallel threads. An operator does only increment its own event-time clock to the minimum of the last watermark received from each input channel. If one input channel is "slow", the event-time of an operator lacks behind and "late" events of the other threads are correctly processed because the operators event-time was not incremented yet.
So, event-time is not deterministic when it comes to which records are dropped. The watermark documentation might be helpful as well [1]. Cheers, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams 2017-04-27 22:09 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > Re-hi, > > I debuged a bit the test for the Event rowtime > > I tested the testBoundNonPartitionedEventTimeWindowWithRange from > SQLITCase class > > Although I would expect that once a watermark is triggered: 1) the on > timer will be called to process the events that arrived so far and 2) the > future events that arrive will be dropped. However, it seems that almost > the entire input can arrive in the processElement function before the > onTimer is triggered. > > Moreover, if you modify the input to add an un-ordered event (see dataset > below where I added after watermark 14000 ...an event with watermark > 1000...as far as I would expect this should be dropped. However, in > different runs it can happen that it will be not dropped. Basically it can > happen that the onTimer was never triggered and this event arrives and it > is registered). Is this correct? Am I missing something? > > > @Test > def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = { > val data = Seq( > Left((1500L, (1L, 15, "Hello"))), > Left((1600L, (1L, 16, "Hello"))), > Left((1000L, (1L, 1, "Hello"))), > Left((2000L, (2L, 2, "Hello"))), > Right(1000L), > Left((2000L, (2L, 2, "Hello"))), > Left((2000L, (2L, 3, "Hello"))), > Left((3000L, (3L, 3, "Hello"))), > Right(2000L), > Left((4000L, (4L, 4, "Hello"))), > Right(3000L), > Left((5000L, (5L, 5, "Hello"))), > Right(5000L), > Left((6000L, (6L, 6, "Hello"))), > Left((6500L, (6L, 65, "Hello"))), > Right(7000L), > Left((9000L, (6L, 9, "Hello"))), > Left((9500L, (6L, 18, "Hello"))), > Left((9000L, (6L, 9, "Hello"))), > Right(10000L), > Left((10000L, (7L, 7, "Hello World"))), > Left((11000L, (7L, 17, "Hello World"))), > Left((11000L, (7L, 77, "Hello World"))), > Right(12000L), > Left((14000L, (7L, 18, "Hello World"))), > Right(14000L), > Left((15000L, (8L, 8, "Hello World"))), > Left((1000L, (8L, 8, "Too late - Hello World"))), ///event is out > of ordered and showed be droppped > Right(17000L), > Left((20000L, (20L, 20, "Hello World"))), > Right(19000L)) > > > > > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Thursday, April 27, 2017 3:17 PM > To: dev@flink.apache.org > Subject: Re: question about rowtime processfunction - are watermarks > needed? > > Hi Radu, > > event-time processing requires watermarks. Operators use watermarks to > compute the current event-time. > The ProcessFunctions for over range windows use the TimerServices to group > elements by time. > In case of event-time, the timers are triggered by the event-time of the > operator which is derived from the received watermarks. > In case of processing-time, the timers are triggered based on the > wallclock time of the operator. > > So by using event-tim timers, we implicitly rely on the watermarks because > the timers are triggered based on the received watermarks. > > Best, Fabian > > > 2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Hi, > > > > I am looking at the implementation of RowTimeBoundedRangeOver (in the > > context of Stream SQL). I see that the logic is that the progress > > happens based on the timestamps of the rowevent - i.e., when an even > > arrives we register to be processed based on it's timestamp > (ctx.timerService. > > registerEventTimeTimer(triggeringTs)) > > > > In the onTimer we remove (retract) data that has expired. However, we > > do not consider watermarks nor some allowed latency for the events or > > anything like this, which makes me ask: > > Don't we need to work with watermarks when we deal with even time? And > > keep the events within the allowed delayed/next watermark? Am I > > missing something? Or maybe we do not consider at this point > > allowedLateness for this version? > > > > Thanks > > > > Best regards, > > > > >