Okay. I think I still must misunderstand something here. I will work on
building a unit test around this, hopefully this clears up my confusion.

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Operator's with multiple inputs emit the minimum of the input's watermarks
> downstream. In case of a keyBy this means that the watermark is sent to all
> downstream consumers.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote:
>
>> Just to add: by printing intermediate results I see that I definitely
>> have more than five minutes of data, and by windowing without the session
>> windows I see that event time watermarks do seem to be generated as
>> expected.
>>
>> Thanks for your help and time.
>>
>> Padarn
>>
>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> I will work on an example, but I’m a little confused by how keyBy and
>>> watermarks work in this case. This documentation says (
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>> ):
>>>
>>>
>>> Some operators consume multiple input streams; a union, for example, or
>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>> operator’s current event time is the minimum of its input streams’ event
>>> times. As its input streams update their event times, so does the operator.
>>>
>>>
>>> This implies to me that the keyBy splits the watermark?
>>>
>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Padarn,
>>>>
>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>> global. Therefore, I would suspect that it is rather a problem with
>>>> generating watermarks at all. Could it be that your input data does not
>>>> span a period longer than 5 minutes and also does not terminate? Another
>>>> problem could be the CountTrigger which should not react to the window's
>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>>> I think this will cause the window to not fire. Maybe a working example
>>>> program with example input could be helpful for further debugging.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote:
>>>>
>>>>> Hi Flink Mailing List,
>>>>>
>>>>> Long story short - I want to somehow collapse watermarks at an
>>>>> operator across keys, so that keys with dragging watermarks do not drag
>>>>> behind. Details below:
>>>>>
>>>>> ---
>>>>>
>>>>> I have an application in which I want to perform the follow sequence
>>>>> of steps: Assume my data is made up of data that has: (time, user,
>>>>> location, action)
>>>>>
>>>>> -> Read source
>>>>> -> KeyBy (UserId, Location)
>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>>> Session)
>>>>> -> TriggerOnFirst event
>>>>> -> KeyBy (Location)
>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>>> -> Count
>>>>>
>>>>> The end intention is to count the number of unique users in a given
>>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>>> counted once.
>>>>>
>>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>>> has the following `TriggerResult" funtion:
>>>>>
>>>>> @Override
>>>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>>>> TriggerContext ctx) throws Exception {
>>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>>   count.add(1L);
>>>>>   if (count.get() == maxCount) {
>>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>>   } else if (count.get() > maxCount) {
>>>>>     return TriggerResult.PURGE;
>>>>>   }
>>>>>   return TriggerResult.CONTINUE;
>>>>>
>>>>> }
>>>>>
>>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>>> because (I assume) there are some users with sessions windows that are not
>>>>> closed, and so the watermark for those keys is running behind and so the
>>>>> SlidingEventTimeWindow watermark is held back too.
>>>>>
>>>>> What I feel like I want to achieve is essentially setting the
>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with
>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell if
>>>>> this is possible, and if not, what another approach could be.
>>>>>
>>>>> Thanks,
>>>>> Padarn
>>>>>
>>>>

Reply via email to