Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always
global. Therefore, I would suspect that it is rather a problem with
generating watermarks at all. Could it be that your input data does not
span a period longer than 5 minutes and also does not terminate? Another
problem could be the CountTrigger which should not react to the window's
end time. The method onEventTime simply returns TriggerResult.CONTINUE and
I think this will cause the window to not fire. Maybe a working example
program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote:

> Hi Flink Mailing List,
>
> Long story short - I want to somehow collapse watermarks at an operator
> across keys, so that keys with dragging watermarks do not drag behind.
> Details below:
>
> ---
>
> I have an application in which I want to perform the follow sequence of
> steps: Assume my data is made up of data that has: (time, user, location,
> action)
>
> -> Read source
> -> KeyBy (UserId, Location)
> -> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
> -> TriggerOnFirst event
> -> KeyBy (Location)
> -> SlidingEventTimeWindow(5min length, 5 second gap)
> -> Count
>
> The end intention is to count the number of unique users in a given
> location - the EventTimeSessionWindow is used to make sure users are only
> counted once.
>
> So I created a custom Trigger, which is the same as CountTrigger, but has
> the following `TriggerResult" funtion:
>
> @Override
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>   count.add(1L);
>   if (count.get() == maxCount) {
>     return TriggerResult.FIRE_AND_PURGE;
>   } else if (count.get() > maxCount) {
>     return TriggerResult.PURGE;
>   }
>   return TriggerResult.CONTINUE;
>
> }
>
> But my final SlidingEventTimeWindow does not fire properly. This is
> because (I assume) there are some users with sessions windows that are not
> closed, and so the watermark for those keys is running behind and so the
> SlidingEventTimeWindow watermark is held back too.
>
> What I feel like I want to achieve is essentially setting the watermark of
> the SlidingEventTimeWindow operator to be the maximum (with lateness) of
> the input keys, rather than the minimum, but I cannot tell if this is
> possible, and if not, what another approach could be.
>
> Thanks,
> Padarn
>

Reply via email to