Operator's with multiple inputs emit the minimum of the input's watermarks downstream. In case of a keyBy this means that the watermark is sent to all downstream consumers.
Cheers, Till On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote: > Just to add: by printing intermediate results I see that I definitely have > more than five minutes of data, and by windowing without the session > windows I see that event time watermarks do seem to be generated as > expected. > > Thanks for your help and time. > > Padarn > > On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote: > >> Hi Till, >> >> I will work on an example, but I’m a little confused by how keyBy and >> watermarks work in this case. This documentation says ( >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams >> ): >> >> >> Some operators consume multiple input streams; a union, for example, or >> operators following a *keyBy(…)*or *partition(…)* function. Such an >> operator’s current event time is the minimum of its input streams’ event >> times. As its input streams update their event times, so does the operator. >> >> >> This implies to me that the keyBy splits the watermark? >> >> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Padarn, >>> >>> Flink does not generate watermarks per keys. Atm watermarks are always >>> global. Therefore, I would suspect that it is rather a problem with >>> generating watermarks at all. Could it be that your input data does not >>> span a period longer than 5 minutes and also does not terminate? Another >>> problem could be the CountTrigger which should not react to the window's >>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and >>> I think this will cause the window to not fire. Maybe a working example >>> program with example input could be helpful for further debugging. >>> >>> Cheers, >>> Till >>> >>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote: >>> >>>> Hi Flink Mailing List, >>>> >>>> Long story short - I want to somehow collapse watermarks at an operator >>>> across keys, so that keys with dragging watermarks do not drag behind. >>>> Details below: >>>> >>>> --- >>>> >>>> I have an application in which I want to perform the follow sequence of >>>> steps: Assume my data is made up of data that has: (time, user, location, >>>> action) >>>> >>>> -> Read source >>>> -> KeyBy (UserId, Location) >>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location >>>> Session) >>>> -> TriggerOnFirst event >>>> -> KeyBy (Location) >>>> -> SlidingEventTimeWindow(5min length, 5 second gap) >>>> -> Count >>>> >>>> The end intention is to count the number of unique users in a given >>>> location - the EventTimeSessionWindow is used to make sure users are only >>>> counted once. >>>> >>>> So I created a custom Trigger, which is the same as CountTrigger, but >>>> has the following `TriggerResult" funtion: >>>> >>>> @Override >>>> public TriggerResult onElement(Object element, long timestamp, W window, >>>> TriggerContext ctx) throws Exception { >>>> ReducingState<Long> count = ctx.getPartitionedState(stateDesc); >>>> count.add(1L); >>>> if (count.get() == maxCount) { >>>> return TriggerResult.FIRE_AND_PURGE; >>>> } else if (count.get() > maxCount) { >>>> return TriggerResult.PURGE; >>>> } >>>> return TriggerResult.CONTINUE; >>>> >>>> } >>>> >>>> But my final SlidingEventTimeWindow does not fire properly. This is >>>> because (I assume) there are some users with sessions windows that are not >>>> closed, and so the watermark for those keys is running behind and so the >>>> SlidingEventTimeWindow watermark is held back too. >>>> >>>> What I feel like I want to achieve is essentially setting the watermark >>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of >>>> the input keys, rather than the minimum, but I cannot tell if this is >>>> possible, and if not, what another approach could be. >>>> >>>> Thanks, >>>> Padarn >>>> >>>