Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator
across keys, so that keys with dragging watermarks do not drag behind.
Details below:

---

I have an application in which I want to perform the follow sequence of
steps: Assume my data is made up of data that has: (time, user, location,
action)

-> Read source
-> KeyBy (UserId, Location)
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given
location - the EventTimeSessionWindow is used to make sure users are only
counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has
the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W
window, TriggerContext ctx) throws Exception {
  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if (count.get() == maxCount) {
    return TriggerResult.FIRE_AND_PURGE;
  } else if (count.get() > maxCount) {
    return TriggerResult.PURGE;
  }
  return TriggerResult.CONTINUE;

}

But my final SlidingEventTimeWindow does not fire properly. This is because
(I assume) there are some users with sessions windows that are not closed,
and so the watermark for those keys is running behind and so the
SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of
the SlidingEventTimeWindow operator to be the maximum (with lateness) of
the input keys, rather than the minimum, but I cannot tell if this is
possible, and if not, what another approach could be.

Thanks,
Padarn

Reply via email to