Hi Till, When I run the example code that you posted, the order of the three messages (window started, contents of window and window ended) is non-deterministic. This is surprising to me, as setParallelism(1) has been used in the pipeline - I assumed this should eliminate any form of race conditions for printing. What's more is that if I *remove* setParallelism(1) from the code, the output is deterministic and correct (i.e. windowStarted -> windowContents -> windowEnded).
Clearly, something is wrong with my understanding. What is it? On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <trohrm...@apache.org> wrote: > Great to hear that you solved the problem. Let us know if you run into any > other issues. > > Cheers, > Till > > On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi, >> This problem is solved[1]. The issue was that the BroadcastStream did not >> contain any watermark, which prevented watermarks for any downstream >> operators from advancing. >> I appreciate all the help. >> [1] >> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern >> >> Thanks, >> Manas >> >> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com> wrote: >> >>> Hi Rafi and Till, >>> Thank you for pointing out that edge case, Rafi. >>> >>> Till, I am trying to get this example working with the BroadcastState >>> pattern upstream to the window operator[1]. The problem is that introducing >>> the BroadcastState makes the onEventTime() *never* fire. Is the >>> BroadcastState somehow eating up the watermark? Do I need to generate the >>> watermark again in the KeyedBroadcastProcessFunction? >>> >>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49 >>> >>> Thanks, >>> Manas >>> >>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Manas and Rafi, >>>> >>>> you are right that when using merging windows as event time session >>>> windows are, then Flink requires that any state the Trigger keeps is of >>>> type MergingState. This constraint allows that the state can be merged >>>> whenever two windows get merged. >>>> >>>> Rafi, you are right. With the current implementation it might happen >>>> that you send a wrong started window message. I think it depends on the >>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also >>>> your watermark. If you want to be on the safe side, then I would recommend >>>> to use the ProcessFunction to implement the required logic. The >>>> ProcessFunction [1] is Flink's low level API and gives you access to state >>>> and timers. In it, you would need to buffer the elements and to sessionize >>>> them yourself, though. However, it would give you access to the >>>> watermark which in turn would allow you to properly handle your described >>>> edge case. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >>>> >>>> Cheers, >>>> Till >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> >>>> wrote: >>>> >>>>> I think one "edge" case which is not handled would be that the first >>>>> event (by event-time) arrives late, then a wrong "started-window" would be >>>>> reported. >>>>> >>>>> Rafi >>>>> >>>>> >>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com> >>>>> wrote: >>>>> >>>>>> Is the reason ValueState cannot be use because session windows are >>>>>> always formed by merging proto-windows of single elements, therefore a >>>>>> state store is needed that can handle merging. ValueState does not >>>>>> provide >>>>>> this functionality, but a ReducingState does? >>>>>> >>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> Thanks for your answer! You also answered the next question that I >>>>>>> was about to ask "Can we share state between a Trigger and a Window?" >>>>>>> Currently the only (convoluted) way to share state between two >>>>>>> operators is >>>>>>> through the broadcast state pattern, right? >>>>>>> Also, in your example, why can't we use a >>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own >>>>>>> example but it I am not able to call the mergePartitionedState() >>>>>>> method >>>>>>> on a ValueStateDescriptor. >>>>>>> >>>>>>> Regards, >>>>>>> Manas >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Manas, >>>>>>>> >>>>>>>> you can implement something like this with a bit of trigger magic. >>>>>>>> What you need to do is to define your own trigger implementation which >>>>>>>> keeps state to remember whether it has triggered the "started window" >>>>>>>> message or not. In the stateful window function you would need to do >>>>>>>> something similar. The first call could trigger the output of "window >>>>>>>> started" and any subsequent call will trigger the evaluation of the >>>>>>>> window. >>>>>>>> It would have been a bit easier if the trigger and the window process >>>>>>>> function could share its internal state. Unfortunately, this is not >>>>>>>> possible at the moment. >>>>>>>> >>>>>>>> I've drafted a potential solution which you can find here [1]. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I want to achieve the following using event time session windows: >>>>>>>>> >>>>>>>>> 1. When the window.getStart() and last event timestamp in the >>>>>>>>> window is greater than MIN_WINDOW_SIZE milliseconds, I want to >>>>>>>>> emit a >>>>>>>>> message "Window started @ timestamp". >>>>>>>>> 2. When the session window ends, i.e. the watermark passes >>>>>>>>> lasteventTimestamp + inactivityPeriod, I want to emit a message >>>>>>>>> "Window >>>>>>>>> ended @ timestamp". >>>>>>>>> >>>>>>>>> It is guaranteed that all events are on time and no lateness is >>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 >>>>>>>>> simultaneously. >>>>>>>>> I am able to implement point 1 using a custom trigger, which >>>>>>>>> checks if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE >>>>>>>>> and >>>>>>>>> triggers a customProcessWindowFunction(). >>>>>>>>> However, with this architecture I can't detect the end of the >>>>>>>>> window. >>>>>>>>> >>>>>>>>> Is my approach correct or is there a completely different method >>>>>>>>> to achieve this? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Manas Kale >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>