Hi Manas and Rafi, you are right that when using merging windows as event time session windows are, then Flink requires that any state the Trigger keeps is of type MergingState. This constraint allows that the state can be merged whenever two windows get merged.
Rafi, you are right. With the current implementation it might happen that you send a wrong started window message. I think it depends on the MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also your watermark. If you want to be on the safe side, then I would recommend to use the ProcessFunction to implement the required logic. The ProcessFunction [1] is Flink's low level API and gives you access to state and timers. In it, you would need to buffer the elements and to sessionize them yourself, though. However, it would give you access to the watermark which in turn would allow you to properly handle your described edge case. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html Cheers, Till Cheers, Till On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > I think one "edge" case which is not handled would be that the first event > (by event-time) arrives late, then a wrong "started-window" would be > reported. > > Rafi > > > On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com> wrote: > >> Is the reason ValueState cannot be use because session windows are always >> formed by merging proto-windows of single elements, therefore a state store >> is needed that can handle merging. ValueState does not provide this >> functionality, but a ReducingState does? >> >> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com> wrote: >> >>> Hi Till, >>> Thanks for your answer! You also answered the next question that I was >>> about to ask "Can we share state between a Trigger and a Window?" Currently >>> the only (convoluted) way to share state between two operators is through >>> the broadcast state pattern, right? >>> Also, in your example, why can't we use a ValueStateDescriptor<Boolean> >>> in the Trigger? I tried using it in my own example but it I am not able >>> to call the mergePartitionedState() method on a ValueStateDescriptor. >>> >>> Regards, >>> Manas >>> >>> >>> >>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Manas, >>>> >>>> you can implement something like this with a bit of trigger magic. What >>>> you need to do is to define your own trigger implementation which keeps >>>> state to remember whether it has triggered the "started window" message or >>>> not. In the stateful window function you would need to do something >>>> similar. The first call could trigger the output of "window started" and >>>> any subsequent call will trigger the evaluation of the window. It would >>>> have been a bit easier if the trigger and the window process function could >>>> share its internal state. Unfortunately, this is not possible at the >>>> moment. >>>> >>>> I've drafted a potential solution which you can find here [1]. >>>> >>>> [1] >>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef >>>> >>>> Cheers, >>>> Till >>>> >>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I want to achieve the following using event time session windows: >>>>> >>>>> 1. When the window.getStart() and last event timestamp in the >>>>> window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a >>>>> message "Window started @ timestamp". >>>>> 2. When the session window ends, i.e. the watermark passes >>>>> lasteventTimestamp + inactivityPeriod, I want to emit a message "Window >>>>> ended @ timestamp". >>>>> >>>>> It is guaranteed that all events are on time and no lateness is >>>>> allowed. I am having difficulty implementing both 1 and 2 simultaneously. >>>>> I am able to implement point 1 using a custom trigger, which checks >>>>> if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and >>>>> triggers >>>>> a customProcessWindowFunction(). >>>>> However, with this architecture I can't detect the end of the window. >>>>> >>>>> Is my approach correct or is there a completely different method to >>>>> achieve this? >>>>> >>>>> Thanks, >>>>> Manas Kale >>>>> >>>>> >>>>> >>>>>