Just to add: by printing intermediate results I see that I definitely have more than five minutes of data, and by windowing without the session windows I see that event time watermarks do seem to be generated as expected.
Thanks for your help and time. Padarn On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote: > Hi Till, > > I will work on an example, but I’m a little confused by how keyBy and > watermarks work in this case. This documentation says ( > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams > ): > > > Some operators consume multiple input streams; a union, for example, or > operators following a *keyBy(…)*or *partition(…)* function. Such an > operator’s current event time is the minimum of its input streams’ event > times. As its input streams update their event times, so does the operator. > > > This implies to me that the keyBy splits the watermark? > > On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Padarn, >> >> Flink does not generate watermarks per keys. Atm watermarks are always >> global. Therefore, I would suspect that it is rather a problem with >> generating watermarks at all. Could it be that your input data does not >> span a period longer than 5 minutes and also does not terminate? Another >> problem could be the CountTrigger which should not react to the window's >> end time. The method onEventTime simply returns TriggerResult.CONTINUE and >> I think this will cause the window to not fire. Maybe a working example >> program with example input could be helpful for further debugging. >> >> Cheers, >> Till >> >> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote: >> >>> Hi Flink Mailing List, >>> >>> Long story short - I want to somehow collapse watermarks at an operator >>> across keys, so that keys with dragging watermarks do not drag behind. >>> Details below: >>> >>> --- >>> >>> I have an application in which I want to perform the follow sequence of >>> steps: Assume my data is made up of data that has: (time, user, location, >>> action) >>> >>> -> Read source >>> -> KeyBy (UserId, Location) >>> -> EventTimeSessionWindow (5 min gap) - results in (User Location >>> Session) >>> -> TriggerOnFirst event >>> -> KeyBy (Location) >>> -> SlidingEventTimeWindow(5min length, 5 second gap) >>> -> Count >>> >>> The end intention is to count the number of unique users in a given >>> location - the EventTimeSessionWindow is used to make sure users are only >>> counted once. >>> >>> So I created a custom Trigger, which is the same as CountTrigger, but >>> has the following `TriggerResult" funtion: >>> >>> @Override >>> public TriggerResult onElement(Object element, long timestamp, W window, >>> TriggerContext ctx) throws Exception { >>> ReducingState<Long> count = ctx.getPartitionedState(stateDesc); >>> count.add(1L); >>> if (count.get() == maxCount) { >>> return TriggerResult.FIRE_AND_PURGE; >>> } else if (count.get() > maxCount) { >>> return TriggerResult.PURGE; >>> } >>> return TriggerResult.CONTINUE; >>> >>> } >>> >>> But my final SlidingEventTimeWindow does not fire properly. This is >>> because (I assume) there are some users with sessions windows that are not >>> closed, and so the watermark for those keys is running behind and so the >>> SlidingEventTimeWindow watermark is held back too. >>> >>> What I feel like I want to achieve is essentially setting the watermark >>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of >>> the input keys, rather than the minimum, but I cannot tell if this is >>> possible, and if not, what another approach could be. >>> >>> Thanks, >>> Padarn >>> >>