I woundn't say that the batch / stream unification doesn't hold. If you make the watermark lag enough (i.e., you do not have any late data) you get exact results, however, not very timely.
Watermarks basically trade completeness of the result for result latency. With support for late updates, you can get early (possibly incomplete) results and correct them when you get additional (late) data. However, you need to discard state at some point in time. If you receive late data for which the required state was discarded, this is the point where batch and streaming results start to diverge. Cheers, Fabian 2017-04-28 11:39 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > Thanks again Fabian for the explanation. > Considering what you said - is there anymore a duality with the batch > case? As the stream cases are non-deterministic I would say the duality in > the sense that a query on the stream should return the same as the query on > the batched data does not hold anymore? > I am just trying to get a deeper understanding of this, which I think will > apply also to the other functions and SQL operators...sorry for bothering > you with this. > > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Friday, April 28, 2017 9:56 AM > To: dev@flink.apache.org > Subject: Re: question about rowtime processfunction - are watermarks > needed? > > Hi Radu, > > yes that might happen in a parallel setup and depends on the "speed" of > the parallel threads. > An operator does only increment its own event-time clock to the minimum of > the last watermark received from each input channel. > If one input channel is "slow", the event-time of an operator lacks behind > and "late" events of the other threads are correctly processed because the > operators event-time was not incremented yet. > > So, event-time is not deterministic when it comes to which records are > dropped. > The watermark documentation might be helpful as well [1]. > > Cheers, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/event_time.html#watermarks-in-parallel-streams > > 2017-04-27 22:09 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Re-hi, > > > > I debuged a bit the test for the Event rowtime > > > > I tested the testBoundNonPartitionedEventTimeWindowWithRange from > > SQLITCase class > > > > Although I would expect that once a watermark is triggered: 1) the on > > timer will be called to process the events that arrived so far and 2) > > the future events that arrive will be dropped. However, it seems that > > almost the entire input can arrive in the processElement function > > before the onTimer is triggered. > > > > Moreover, if you modify the input to add an un-ordered event (see > > dataset below where I added after watermark 14000 ...an event with > > watermark 1000...as far as I would expect this should be dropped. > > However, in different runs it can happen that it will be not dropped. > > Basically it can happen that the onTimer was never triggered and this > > event arrives and it is registered). Is this correct? Am I missing > something? > > > > > > @Test > > def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = { > > val data = Seq( > > Left((1500L, (1L, 15, "Hello"))), > > Left((1600L, (1L, 16, "Hello"))), > > Left((1000L, (1L, 1, "Hello"))), > > Left((2000L, (2L, 2, "Hello"))), > > Right(1000L), > > Left((2000L, (2L, 2, "Hello"))), > > Left((2000L, (2L, 3, "Hello"))), > > Left((3000L, (3L, 3, "Hello"))), > > Right(2000L), > > Left((4000L, (4L, 4, "Hello"))), > > Right(3000L), > > Left((5000L, (5L, 5, "Hello"))), > > Right(5000L), > > Left((6000L, (6L, 6, "Hello"))), > > Left((6500L, (6L, 65, "Hello"))), > > Right(7000L), > > Left((9000L, (6L, 9, "Hello"))), > > Left((9500L, (6L, 18, "Hello"))), > > Left((9000L, (6L, 9, "Hello"))), > > Right(10000L), > > Left((10000L, (7L, 7, "Hello World"))), > > Left((11000L, (7L, 17, "Hello World"))), > > Left((11000L, (7L, 77, "Hello World"))), > > Right(12000L), > > Left((14000L, (7L, 18, "Hello World"))), > > Right(14000L), > > Left((15000L, (8L, 8, "Hello World"))), > > Left((1000L, (8L, 8, "Too late - Hello World"))), ///event is out > > of ordered and showed be droppped > > Right(17000L), > > Left((20000L, (20L, 20, "Hello World"))), > > Right(19000L)) > > > > > > > > > > -----Original Message----- > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > Sent: Thursday, April 27, 2017 3:17 PM > > To: dev@flink.apache.org > > Subject: Re: question about rowtime processfunction - are watermarks > > needed? > > > > Hi Radu, > > > > event-time processing requires watermarks. Operators use watermarks to > > compute the current event-time. > > The ProcessFunctions for over range windows use the TimerServices to > > group elements by time. > > In case of event-time, the timers are triggered by the event-time of > > the operator which is derived from the received watermarks. > > In case of processing-time, the timers are triggered based on the > > wallclock time of the operator. > > > > So by using event-tim timers, we implicitly rely on the watermarks > > because the timers are triggered based on the received watermarks. > > > > Best, Fabian > > > > > > 2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > Hi, > > > > > > I am looking at the implementation of RowTimeBoundedRangeOver (in > > > the context of Stream SQL). I see that the logic is that the > > > progress happens based on the timestamps of the rowevent - i.e., > > > when an even arrives we register to be processed based on it's > > > timestamp > > (ctx.timerService. > > > registerEventTimeTimer(triggeringTs)) > > > > > > In the onTimer we remove (retract) data that has expired. However, > > > we do not consider watermarks nor some allowed latency for the > > > events or anything like this, which makes me ask: > > > Don't we need to work with watermarks when we deal with even time? > > > And keep the events within the allowed delayed/next watermark? Am I > > > missing something? Or maybe we do not consider at this point > > > allowedLateness for this version? > > > > > > Thanks > > > > > > Best regards, > > > > > > > > >