Hi Till, Thank you for the explanation, I understand the behaviour now.
On Thu, Mar 26, 2020 at 9:23 PM Till Rohrmann <trohrm...@apache.org> wrote: > A quick update concerning your observations. The reason why you are seeing > the unordered output is because in the gist we used > a AssignerWithPeriodicWatermarks which generates watermarks periodically. > Due to this aspect, it can happen that Flink already process all elements > up to "20" before it sees the next watermark which triggers the processing. > If there are multiple windows being processed, Flink does not give a > guarantee in which order this happens. > > You can avoid this behaviour if you used > an AssignerWithPunctuatedWatermarks instead. This watermark assigner is > called for every record. The updated gist [1] shows how it is used. > > [1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1 > > Cheers, > Till > > On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hmm, I might have given you a bad advice. I think the problem becomes >> harder because with Flink's window and trigger API we need to keep state >> consistent between the Trigger and the Window function. Maybe it would be >> easier to not rely on the windowing mechanism and instead to use Flink's >> process function [1] to implement the logic yourself. >> >> With the process function you have basically a low level API with which >> you can implement an operator which groups incoming events according to >> sessions and outputs the required information. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >> >> Cheers, >> Till >> >> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale <manaskal...@gmail.com> >> wrote: >> >>> Hi Till, >>> I see, thanks for the clarification. >>> Assuming all other setting are the same, if I generate events as follows >>> : >>> Element.from("1", 1000L), >>> Element.from("2", 2000L), >>> Element.from("3", 3000L), >>> Element.from("10", 10000L) >>> ,Element.from("11", 11000L), >>> Element.from("12", 12000L), >>> Element.from("20", 20000L) >>> we will expect 2 session windows to be created {1,2,3} and {10,11,12} >>> with appropriate messages. However, when I run this, there seems to be a >>> problem in the valueState of MyWindowFunction. Apparently that state is >>> being shared by both the session windows, which leads to incorrect results. >>> To solve this, I replaced it with a MapState<Long, Boolean>. The Long is >>> the start timestamp of a window, something that can uniquely identify >>> different windows. This works but with one caveat : if we have two >>> subsequent windows, the ordering of messages is : >>> >>> window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @ >>> 8000 -> window2 ended @ 17000 >>> >>> whereas I expect it to be : >>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ >>> 10000 -> window2 ended @ 17000 >>> >>> I thought Flink would execute event time timers and process events in >>> chronological event time order. However, it seems that the onEventTime() >>> invocation of window1 is called *after *elements from window2 have been >>> processed even though window1's onEventTime() is earlier in event time. >>> >>> Is my approach and reasoning correct? Also, is it possible to get the >>> messages in the expected order? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Manas, >>>> >>>> the problem is that the print() statement is being executed with a >>>> different parallelism than 1. Due to this fact, the messages coming from >>>> the window function will be sent in round-robin fashion to the print >>>> operators. If you remove the setParallelism(1) from the window function, >>>> then the window function will be executed with the same parallelism as the >>>> print operator. Due to this fact, there is no round-robin distribution of >>>> the events but every window function task will simply forward its >>>> elements to its print operator task. You should be able to see these >>>> topology differences in the web ui. >>>> >>>> You could configure the print() operator to run with a parallelism of 1 >>>> as well by adding a setParallelism(1) statement to it. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <manaskal...@gmail.com> >>>> wrote: >>>> >>>>> Hi Till, >>>>> When I run the example code that you posted, the order of the three >>>>> messages (window started, contents of window and window ended) is >>>>> non-deterministic. This is surprising to me, as setParallelism(1) has been >>>>> used in the pipeline - I assumed this should eliminate any form of race >>>>> conditions for printing. What's more is that if I *remove* >>>>> setParallelism(1) from the code, the output is deterministic and correct >>>>> (i.e. windowStarted -> windowContents -> windowEnded). >>>>> >>>>> Clearly, something is wrong with my understanding. What is it? >>>>> >>>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Great to hear that you solved the problem. Let us know if you run >>>>>> into any other issues. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> This problem is solved[1]. The issue was that the BroadcastStream >>>>>>> did not contain any watermark, which prevented watermarks for any >>>>>>> downstream operators from advancing. >>>>>>> I appreciate all the help. >>>>>>> [1] >>>>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern >>>>>>> >>>>>>> Thanks, >>>>>>> Manas >>>>>>> >>>>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Rafi and Till, >>>>>>>> Thank you for pointing out that edge case, Rafi. >>>>>>>> >>>>>>>> Till, I am trying to get this example working with the >>>>>>>> BroadcastState pattern upstream to the window operator[1]. The problem >>>>>>>> is >>>>>>>> that introducing the BroadcastState makes the onEventTime() *never* >>>>>>>> fire. Is the BroadcastState somehow eating up the watermark? Do I need >>>>>>>> to >>>>>>>> generate the watermark again in the KeyedBroadcastProcessFunction? >>>>>>>> >>>>>>>> [1] >>>>>>>> https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49 >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Manas >>>>>>>> >>>>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Manas and Rafi, >>>>>>>>> >>>>>>>>> you are right that when using merging windows as event time >>>>>>>>> session windows are, then Flink requires that any state the Trigger >>>>>>>>> keeps >>>>>>>>> is of type MergingState. This constraint allows that the state can be >>>>>>>>> merged whenever two windows get merged. >>>>>>>>> >>>>>>>>> Rafi, you are right. With the current implementation it might >>>>>>>>> happen that you send a wrong started window message. I think it >>>>>>>>> depends on >>>>>>>>> the MIN_WINDOW_SIZE and the distribution of your timestamps and, >>>>>>>>> hence, >>>>>>>>> also your watermark. If you want to be on the safe side, then I would >>>>>>>>> recommend to use the ProcessFunction to implement the required logic. >>>>>>>>> The >>>>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to >>>>>>>>> state >>>>>>>>> and timers. In it, you would need to buffer the elements and to >>>>>>>>> sessionize >>>>>>>>> them yourself, though. However, it would give you access to the >>>>>>>>> watermark which in turn would allow you to properly handle your >>>>>>>>> described >>>>>>>>> edge case. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I think one "edge" case which is not handled would be that the >>>>>>>>>> first event (by event-time) arrives late, then a wrong >>>>>>>>>> "started-window" >>>>>>>>>> would be reported. >>>>>>>>>> >>>>>>>>>> Rafi >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale < >>>>>>>>>> manaskal...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Is the reason ValueState cannot be use because session windows >>>>>>>>>>> are always formed by merging proto-windows of single elements, >>>>>>>>>>> therefore a >>>>>>>>>>> state store is needed that can handle merging. ValueState does not >>>>>>>>>>> provide >>>>>>>>>>> this functionality, but a ReducingState does? >>>>>>>>>>> >>>>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale < >>>>>>>>>>> manaskal...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Till, >>>>>>>>>>>> Thanks for your answer! You also answered the next question >>>>>>>>>>>> that I was about to ask "Can we share state between a Trigger and a >>>>>>>>>>>> Window?" Currently the only (convoluted) way to share state >>>>>>>>>>>> between two >>>>>>>>>>>> operators is through the broadcast state pattern, right? >>>>>>>>>>>> Also, in your example, why can't we use a >>>>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in >>>>>>>>>>>> my own >>>>>>>>>>>> example but it I am not able to call the mergePartitionedState() >>>>>>>>>>>> method >>>>>>>>>>>> on a ValueStateDescriptor. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Manas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann < >>>>>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Manas, >>>>>>>>>>>>> >>>>>>>>>>>>> you can implement something like this with a bit of trigger >>>>>>>>>>>>> magic. What you need to do is to define your own trigger >>>>>>>>>>>>> implementation >>>>>>>>>>>>> which keeps state to remember whether it has triggered the >>>>>>>>>>>>> "started window" >>>>>>>>>>>>> message or not. In the stateful window function you would need to >>>>>>>>>>>>> do >>>>>>>>>>>>> something similar. The first call could trigger the output of >>>>>>>>>>>>> "window >>>>>>>>>>>>> started" and any subsequent call will trigger the evaluation of >>>>>>>>>>>>> the window. >>>>>>>>>>>>> It would have been a bit easier if the trigger and the window >>>>>>>>>>>>> process >>>>>>>>>>>>> function could share its internal state. Unfortunately, this is >>>>>>>>>>>>> not >>>>>>>>>>>>> possible at the moment. >>>>>>>>>>>>> >>>>>>>>>>>>> I've drafted a potential solution which you can find here [1]. >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Till >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale < >>>>>>>>>>>>> manaskal...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> I want to achieve the following using event time session >>>>>>>>>>>>>> windows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. When the window.getStart() and last event timestamp in >>>>>>>>>>>>>> the window is greater than MIN_WINDOW_SIZE milliseconds, I >>>>>>>>>>>>>> want to emit a >>>>>>>>>>>>>> message "Window started @ timestamp". >>>>>>>>>>>>>> 2. When the session window ends, i.e. the watermark >>>>>>>>>>>>>> passes lasteventTimestamp + inactivityPeriod, I want to emit >>>>>>>>>>>>>> a message >>>>>>>>>>>>>> "Window ended @ timestamp". >>>>>>>>>>>>>> >>>>>>>>>>>>>> It is guaranteed that all events are on time and no lateness >>>>>>>>>>>>>> is allowed. I am having difficulty implementing both 1 and 2 >>>>>>>>>>>>>> simultaneously. >>>>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which >>>>>>>>>>>>>> checks if (lastEventTimestamp - window.getStart()) > >>>>>>>>>>>>>> MIN_WINDOW_SIZE and >>>>>>>>>>>>>> triggers a customProcessWindowFunction(). >>>>>>>>>>>>>> However, with this architecture I can't detect the end of the >>>>>>>>>>>>>> window. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is my approach correct or is there a completely different >>>>>>>>>>>>>> method to achieve this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Manas Kale >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>