Hi,
Thanks again Fabian for the explanation. 
Considering what you said - is there anymore a duality with the batch case? As 
the stream cases are non-deterministic I would say the duality in the sense 
that a query on the stream should return the same as the query on the batched 
data does not hold anymore?
I am just trying to get a deeper understanding of this, which I think will 
apply also to the other functions and SQL operators...sorry for bothering you 
with this.

-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Friday, April 28, 2017 9:56 AM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

yes that might happen in a parallel setup and depends on the "speed" of the 
parallel threads.
An operator does only increment its own event-time clock to the minimum of the 
last watermark received from each input channel.
If one input channel is "slow", the event-time of an operator lacks behind and 
"late" events of the other threads are correctly processed because the 
operators event-time was not incremented yet.

So, event-time is not deterministic when it comes to which records are dropped.
The watermark documentation might be helpful as well [1].

Cheers,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams

2017-04-27 22:09 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Re-hi,
>
> I debuged a bit the test for the Event rowtime
>
> I tested the testBoundNonPartitionedEventTimeWindowWithRange from 
> SQLITCase class
>
> Although I would expect that once a watermark is triggered: 1) the on 
> timer will be called to process the events that arrived so far and 2) 
> the future events that arrive will be dropped. However, it seems that 
> almost the entire input can arrive in the processElement function 
> before the onTimer is triggered.
>
> Moreover, if you modify the input to add an un-ordered event (see 
> dataset below where I added after watermark 14000 ...an event with 
> watermark 1000...as far as I would expect this should be dropped. 
> However, in different runs it can happen that it will be not dropped. 
> Basically it can happen that the onTimer was never triggered and this 
> event arrives and it is registered). Is this correct? Am I missing something?
>
>
>    @Test
>   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
>     val data = Seq(
>       Left((1500L, (1L, 15, "Hello"))),
>       Left((1600L, (1L, 16, "Hello"))),
>       Left((1000L, (1L, 1, "Hello"))),
>       Left((2000L, (2L, 2, "Hello"))),
>       Right(1000L),
>       Left((2000L, (2L, 2, "Hello"))),
>       Left((2000L, (2L, 3, "Hello"))),
>       Left((3000L, (3L, 3, "Hello"))),
>       Right(2000L),
>       Left((4000L, (4L, 4, "Hello"))),
>       Right(3000L),
>       Left((5000L, (5L, 5, "Hello"))),
>       Right(5000L),
>       Left((6000L, (6L, 6, "Hello"))),
>       Left((6500L, (6L, 65, "Hello"))),
>       Right(7000L),
>       Left((9000L, (6L, 9, "Hello"))),
>       Left((9500L, (6L, 18, "Hello"))),
>       Left((9000L, (6L, 9, "Hello"))),
>       Right(10000L),
>       Left((10000L, (7L, 7, "Hello World"))),
>       Left((11000L, (7L, 17, "Hello World"))),
>       Left((11000L, (7L, 77, "Hello World"))),
>       Right(12000L),
>       Left((14000L, (7L, 18, "Hello World"))),
>       Right(14000L),
>       Left((15000L, (8L, 8, "Hello World"))),
>       Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> of ordered and showed be droppped
>       Right(17000L),
>       Left((20000L, (20L, 20, "Hello World"))),
>       Right(19000L))
>
>
>
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, April 27, 2017 3:17 PM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks 
> needed?
>
> Hi Radu,
>
> event-time processing requires watermarks. Operators use watermarks to 
> compute the current event-time.
> The ProcessFunctions for over range windows use the TimerServices to 
> group elements by time.
> In case of event-time, the timers are triggered by the event-time of 
> the operator which is derived from the received watermarks.
> In case of processing-time, the timers are triggered based on the 
> wallclock time of the operator.
>
> So by using event-tim timers, we implicitly rely on the watermarks 
> because the timers are triggered based on the received watermarks.
>
> Best, Fabian
>
>
> 2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
> > Hi,
> >
> > I am looking at the implementation of  RowTimeBoundedRangeOver (in 
> > the context of Stream SQL). I see that the logic is that the 
> > progress happens based on the timestamps of the rowevent - i.e., 
> > when an even arrives we register to be processed based on it's 
> > timestamp
> (ctx.timerService.
> > registerEventTimeTimer(triggeringTs))
> >
> > In the onTimer we remove (retract) data that has expired. However, 
> > we do not consider watermarks nor some allowed latency for the 
> > events or anything like this, which makes me ask:
> > Don't we need to work with watermarks when we deal with even time? 
> > And keep the events within the allowed delayed/next watermark?  Am I 
> > missing something? Or maybe we do not consider at this point 
> > allowedLateness  for this version?
> >
> > Thanks
> >
> > Best regards,
> >
> >
>

Reply via email to