Hi Rafi and Till,
Thank you for pointing out that edge case, Rafi.

Till, I am trying to get this example working with the BroadcastState
pattern upstream to the window operator[1]. The problem is that introducing
the BroadcastState makes the onEventTime() *never* fire. Is the
BroadcastState somehow eating up the watermark? Do I need to generate the
watermark again in the KeyedBroadcastProcessFunction?

[1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49

Thanks,
Manas

On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Manas and Rafi,
>
> you are right that when using merging windows as event time session
> windows are, then Flink requires that any state the Trigger keeps is of
> type MergingState. This constraint allows that the state can be merged
> whenever two windows get merged.
>
> Rafi, you are right. With the current implementation it might happen that
> you send a wrong started window message. I think it depends on the
> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
> your watermark. If you want to be on the safe side, then I would recommend
> to use the ProcessFunction to implement the required logic. The
> ProcessFunction [1] is Flink's low level API and gives you access to state
> and timers. In it, you would need to buffer the elements and to sessionize
> them yourself, though. However, it would give you access to the
> watermark which in turn would allow you to properly handle your described
> edge case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> I think one "edge" case which is not handled would be that the first
>> event (by event-time) arrives late, then a wrong "started-window" would be
>> reported.
>>
>> Rafi
>>
>>
>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com>
>> wrote:
>>
>>> Is the reason ValueState cannot be use because session windows are
>>> always formed by merging proto-windows of single elements, therefore a
>>> state store is needed that can handle merging. ValueState does not provide
>>> this functionality, but a ReducingState does?
>>>
>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>> Thanks for your answer! You also answered the next question that I was
>>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>>> the only (convoluted) way to share state between two operators is through
>>>> the broadcast state pattern, right?
>>>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean>
>>>> in the Trigger? I tried using it in my own example but it  I am not able
>>>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>>
>>>>
>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Manas,
>>>>>
>>>>> you can implement something like this with a bit of trigger magic.
>>>>> What you need to do is to define your own trigger implementation which
>>>>> keeps state to remember whether it has triggered the "started window"
>>>>> message or not. In the stateful window function you would need to do
>>>>> something similar. The first call could trigger the output of "window
>>>>> started" and any subsequent call will trigger the evaluation of the 
>>>>> window.
>>>>> It would have been a bit easier if the trigger and the window process
>>>>> function could share its internal state. Unfortunately, this is not
>>>>> possible at the moment.
>>>>>
>>>>> I've drafted a potential solution which you can find here [1].
>>>>>
>>>>> [1]
>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I want to achieve the following using event time session windows:
>>>>>>
>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>>>>>>    message "Window started @ timestamp".
>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message 
>>>>>> "Window
>>>>>>    ended @ timestamp".
>>>>>>
>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and 
>>>>>> triggers
>>>>>> a customProcessWindowFunction().
>>>>>> However, with this architecture I can't detect the end of the window.
>>>>>>
>>>>>> Is my approach correct or is there a completely different method to
>>>>>> achieve this?
>>>>>>
>>>>>> Thanks,
>>>>>> Manas Kale
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to