Hi Rafi and Till, Thank you for pointing out that edge case, Rafi. Till, I am trying to get this example working with the BroadcastState pattern upstream to the window operator[1]. The problem is that introducing the BroadcastState makes the onEventTime() *never* fire. Is the BroadcastState somehow eating up the watermark? Do I need to generate the watermark again in the KeyedBroadcastProcessFunction?
[1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49 Thanks, Manas On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Manas and Rafi, > > you are right that when using merging windows as event time session > windows are, then Flink requires that any state the Trigger keeps is of > type MergingState. This constraint allows that the state can be merged > whenever two windows get merged. > > Rafi, you are right. With the current implementation it might happen that > you send a wrong started window message. I think it depends on the > MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also > your watermark. If you want to be on the safe side, then I would recommend > to use the ProcessFunction to implement the required logic. The > ProcessFunction [1] is Flink's low level API and gives you access to state > and timers. In it, you would need to buffer the elements and to sessionize > them yourself, though. However, it would give you access to the > watermark which in turn would allow you to properly handle your described > edge case. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html > > Cheers, > Till > > Cheers, > Till > > On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> I think one "edge" case which is not handled would be that the first >> event (by event-time) arrives late, then a wrong "started-window" would be >> reported. >> >> Rafi >> >> >> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com> >> wrote: >> >>> Is the reason ValueState cannot be use because session windows are >>> always formed by merging proto-windows of single elements, therefore a >>> state store is needed that can handle merging. ValueState does not provide >>> this functionality, but a ReducingState does? >>> >>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com> >>> wrote: >>> >>>> Hi Till, >>>> Thanks for your answer! You also answered the next question that I was >>>> about to ask "Can we share state between a Trigger and a Window?" Currently >>>> the only (convoluted) way to share state between two operators is through >>>> the broadcast state pattern, right? >>>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean> >>>> in the Trigger? I tried using it in my own example but it I am not able >>>> to call the mergePartitionedState() method on a ValueStateDescriptor. >>>> >>>> Regards, >>>> Manas >>>> >>>> >>>> >>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Manas, >>>>> >>>>> you can implement something like this with a bit of trigger magic. >>>>> What you need to do is to define your own trigger implementation which >>>>> keeps state to remember whether it has triggered the "started window" >>>>> message or not. In the stateful window function you would need to do >>>>> something similar. The first call could trigger the output of "window >>>>> started" and any subsequent call will trigger the evaluation of the >>>>> window. >>>>> It would have been a bit easier if the trigger and the window process >>>>> function could share its internal state. Unfortunately, this is not >>>>> possible at the moment. >>>>> >>>>> I've drafted a potential solution which you can find here [1]. >>>>> >>>>> [1] >>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> I want to achieve the following using event time session windows: >>>>>> >>>>>> 1. When the window.getStart() and last event timestamp in the >>>>>> window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a >>>>>> message "Window started @ timestamp". >>>>>> 2. When the session window ends, i.e. the watermark passes >>>>>> lasteventTimestamp + inactivityPeriod, I want to emit a message >>>>>> "Window >>>>>> ended @ timestamp". >>>>>> >>>>>> It is guaranteed that all events are on time and no lateness is >>>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously. >>>>>> I am able to implement point 1 using a custom trigger, which checks >>>>>> if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and >>>>>> triggers >>>>>> a customProcessWindowFunction(). >>>>>> However, with this architecture I can't detect the end of the window. >>>>>> >>>>>> Is my approach correct or is there a completely different method to >>>>>> achieve this? >>>>>> >>>>>> Thanks, >>>>>> Manas Kale >>>>>> >>>>>> >>>>>> >>>>>>