Great to hear that you solved the problem. Let us know if you run into any other issues.
Cheers, Till On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com> wrote: > Hi, > This problem is solved[1]. The issue was that the BroadcastStream did not > contain any watermark, which prevented watermarks for any downstream > operators from advancing. > I appreciate all the help. > [1] > https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern > > Thanks, > Manas > > On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi Rafi and Till, >> Thank you for pointing out that edge case, Rafi. >> >> Till, I am trying to get this example working with the BroadcastState >> pattern upstream to the window operator[1]. The problem is that introducing >> the BroadcastState makes the onEventTime() *never* fire. Is the >> BroadcastState somehow eating up the watermark? Do I need to generate the >> watermark again in the KeyedBroadcastProcessFunction? >> >> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49 >> >> Thanks, >> Manas >> >> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Manas and Rafi, >>> >>> you are right that when using merging windows as event time session >>> windows are, then Flink requires that any state the Trigger keeps is of >>> type MergingState. This constraint allows that the state can be merged >>> whenever two windows get merged. >>> >>> Rafi, you are right. With the current implementation it might happen >>> that you send a wrong started window message. I think it depends on the >>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also >>> your watermark. If you want to be on the safe side, then I would recommend >>> to use the ProcessFunction to implement the required logic. The >>> ProcessFunction [1] is Flink's low level API and gives you access to state >>> and timers. In it, you would need to buffer the elements and to sessionize >>> them yourself, though. However, it would give you access to the >>> watermark which in turn would allow you to properly handle your described >>> edge case. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >>> >>> Cheers, >>> Till >>> >>> Cheers, >>> Till >>> >>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> >>> wrote: >>> >>>> I think one "edge" case which is not handled would be that the first >>>> event (by event-time) arrives late, then a wrong "started-window" would be >>>> reported. >>>> >>>> Rafi >>>> >>>> >>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com> >>>> wrote: >>>> >>>>> Is the reason ValueState cannot be use because session windows are >>>>> always formed by merging proto-windows of single elements, therefore a >>>>> state store is needed that can handle merging. ValueState does not provide >>>>> this functionality, but a ReducingState does? >>>>> >>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Till, >>>>>> Thanks for your answer! You also answered the next question that I >>>>>> was about to ask "Can we share state between a Trigger and a Window?" >>>>>> Currently the only (convoluted) way to share state between two operators >>>>>> is >>>>>> through the broadcast state pattern, right? >>>>>> Also, in your example, why can't we use a >>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own >>>>>> example but it I am not able to call the mergePartitionedState() method >>>>>> on a ValueStateDescriptor. >>>>>> >>>>>> Regards, >>>>>> Manas >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Manas, >>>>>>> >>>>>>> you can implement something like this with a bit of trigger magic. >>>>>>> What you need to do is to define your own trigger implementation which >>>>>>> keeps state to remember whether it has triggered the "started window" >>>>>>> message or not. In the stateful window function you would need to do >>>>>>> something similar. The first call could trigger the output of "window >>>>>>> started" and any subsequent call will trigger the evaluation of the >>>>>>> window. >>>>>>> It would have been a bit easier if the trigger and the window process >>>>>>> function could share its internal state. Unfortunately, this is not >>>>>>> possible at the moment. >>>>>>> >>>>>>> I've drafted a potential solution which you can find here [1]. >>>>>>> >>>>>>> [1] >>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> I want to achieve the following using event time session windows: >>>>>>>> >>>>>>>> 1. When the window.getStart() and last event timestamp in the >>>>>>>> window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit >>>>>>>> a >>>>>>>> message "Window started @ timestamp". >>>>>>>> 2. When the session window ends, i.e. the watermark passes >>>>>>>> lasteventTimestamp + inactivityPeriod, I want to emit a message >>>>>>>> "Window >>>>>>>> ended @ timestamp". >>>>>>>> >>>>>>>> It is guaranteed that all events are on time and no lateness is >>>>>>>> allowed. I am having difficulty implementing both 1 and 2 >>>>>>>> simultaneously. >>>>>>>> I am able to implement point 1 using a custom trigger, which checks >>>>>>>> if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and >>>>>>>> triggers >>>>>>>> a customProcessWindowFunction(). >>>>>>>> However, with this architecture I can't detect the end of the >>>>>>>> window. >>>>>>>> >>>>>>>> Is my approach correct or is there a completely different method to >>>>>>>> achieve this? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Manas Kale >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>