Okay. I think I still must misunderstand something here. I will work on building a unit test around this, hopefully this clears up my confusion.
Thank you, Padarn On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <trohrm...@apache.org> wrote: > Operator's with multiple inputs emit the minimum of the input's watermarks > downstream. In case of a keyBy this means that the watermark is sent to all > downstream consumers. > > Cheers, > Till > > On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote: > >> Just to add: by printing intermediate results I see that I definitely >> have more than five minutes of data, and by windowing without the session >> windows I see that event time watermarks do seem to be generated as >> expected. >> >> Thanks for your help and time. >> >> Padarn >> >> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote: >> >>> Hi Till, >>> >>> I will work on an example, but I’m a little confused by how keyBy and >>> watermarks work in this case. This documentation says ( >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams >>> ): >>> >>> >>> Some operators consume multiple input streams; a union, for example, or >>> operators following a *keyBy(…)*or *partition(…)* function. Such an >>> operator’s current event time is the minimum of its input streams’ event >>> times. As its input streams update their event times, so does the operator. >>> >>> >>> This implies to me that the keyBy splits the watermark? >>> >>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Padarn, >>>> >>>> Flink does not generate watermarks per keys. Atm watermarks are always >>>> global. Therefore, I would suspect that it is rather a problem with >>>> generating watermarks at all. Could it be that your input data does not >>>> span a period longer than 5 minutes and also does not terminate? Another >>>> problem could be the CountTrigger which should not react to the window's >>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and >>>> I think this will cause the window to not fire. Maybe a working example >>>> program with example input could be helpful for further debugging. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote: >>>> >>>>> Hi Flink Mailing List, >>>>> >>>>> Long story short - I want to somehow collapse watermarks at an >>>>> operator across keys, so that keys with dragging watermarks do not drag >>>>> behind. Details below: >>>>> >>>>> --- >>>>> >>>>> I have an application in which I want to perform the follow sequence >>>>> of steps: Assume my data is made up of data that has: (time, user, >>>>> location, action) >>>>> >>>>> -> Read source >>>>> -> KeyBy (UserId, Location) >>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location >>>>> Session) >>>>> -> TriggerOnFirst event >>>>> -> KeyBy (Location) >>>>> -> SlidingEventTimeWindow(5min length, 5 second gap) >>>>> -> Count >>>>> >>>>> The end intention is to count the number of unique users in a given >>>>> location - the EventTimeSessionWindow is used to make sure users are only >>>>> counted once. >>>>> >>>>> So I created a custom Trigger, which is the same as CountTrigger, but >>>>> has the following `TriggerResult" funtion: >>>>> >>>>> @Override >>>>> public TriggerResult onElement(Object element, long timestamp, W window, >>>>> TriggerContext ctx) throws Exception { >>>>> ReducingState<Long> count = ctx.getPartitionedState(stateDesc); >>>>> count.add(1L); >>>>> if (count.get() == maxCount) { >>>>> return TriggerResult.FIRE_AND_PURGE; >>>>> } else if (count.get() > maxCount) { >>>>> return TriggerResult.PURGE; >>>>> } >>>>> return TriggerResult.CONTINUE; >>>>> >>>>> } >>>>> >>>>> But my final SlidingEventTimeWindow does not fire properly. This is >>>>> because (I assume) there are some users with sessions windows that are not >>>>> closed, and so the watermark for those keys is running behind and so the >>>>> SlidingEventTimeWindow watermark is held back too. >>>>> >>>>> What I feel like I want to achieve is essentially setting the >>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with >>>>> lateness) of the input keys, rather than the minimum, but I cannot tell if >>>>> this is possible, and if not, what another approach could be. >>>>> >>>>> Thanks, >>>>> Padarn >>>>> >>>>