Hi Till,
Thank you for the explanation, I understand the behaviour now.

On Thu, Mar 26, 2020 at 9:23 PM Till Rohrmann <trohrm...@apache.org> wrote:

> A quick update concerning your observations. The reason why you are seeing
> the unordered output is because in the gist we used
> a AssignerWithPeriodicWatermarks which generates watermarks periodically.
> Due to this aspect, it can happen that Flink already process all elements
> up to "20" before it sees the next watermark which triggers the processing.
> If there are multiple windows being processed, Flink does not give a
> guarantee in which order this happens.
>
> You can avoid this behaviour if you used
> an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
> called for every record. The updated gist [1] shows how it is used.
>
> [1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hmm, I might have given you a bad advice. I think the problem becomes
>> harder because with Flink's window and trigger API we need to keep state
>> consistent between the Trigger and the Window function. Maybe it would be
>> easier to not rely on the windowing mechanism and instead to use Flink's
>> process function [1] to implement the logic yourself.
>>
>> With the process function you have basically a low level API with which
>> you can implement an operator which groups incoming events according to
>> sessions and outputs the required information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale <manaskal...@gmail.com>
>> wrote:
>>
>>> Hi Till,
>>> I see, thanks for the clarification.
>>> Assuming all other setting are the same, if I generate events as follows
>>> :
>>> Element.from("1", 1000L),
>>>                 Element.from("2", 2000L),
>>>                 Element.from("3", 3000L),
>>>                 Element.from("10", 10000L)
>>>                 ,Element.from("11", 11000L),
>>>                 Element.from("12", 12000L),
>>>                 Element.from("20", 20000L)
>>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>>> with appropriate messages. However, when I run this, there seems to be a
>>> problem in the valueState of MyWindowFunction. Apparently that state is
>>> being shared by both the session windows, which leads to incorrect results.
>>> To solve this, I replaced it with a MapState<Long, Boolean>. The Long is
>>> the start timestamp of a window, something that can uniquely identify
>>> different windows. This works but with one caveat : if we have two
>>> subsequent windows, the ordering of messages is :
>>>
>>> window1 started @ 1000 -> window2 started @ 10000 -> window1 ended @
>>> 8000 -> window2 ended @ 17000
>>>
>>> whereas I expect it to be :
>>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @
>>> 10000 -> window2 ended @ 17000
>>>
>>> I thought Flink would execute event time timers and process events in
>>> chronological event time order. However, it seems that the onEventTime()
>>> invocation of window1 is called *after *elements from window2 have been
>>> processed even though window1's onEventTime() is earlier in event time.
>>>
>>> Is my approach and reasoning correct? Also, is it possible to get the
>>> messages in the expected order?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> the problem is that the print() statement is being executed with a
>>>> different parallelism than 1. Due to this fact, the messages coming from
>>>> the window function will be sent in round-robin fashion to the print
>>>> operators. If you remove the setParallelism(1) from the window function,
>>>> then the window function will be executed with the same parallelism as the
>>>> print operator. Due to this fact, there is no round-robin distribution of
>>>> the events but every window function task will simply forward its
>>>> elements to its print operator task. You should be able to see these
>>>> topology differences in the web ui.
>>>>
>>>> You could configure the print() operator to run with a parallelism of 1
>>>> as well by adding a setParallelism(1) statement to it.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale <manaskal...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> When I run the example code that you posted, the order of the three
>>>>> messages (window started, contents of window and window ended) is
>>>>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>>>>> used in the pipeline - I assumed this should eliminate any form of race
>>>>> conditions for printing. What's more is that if I *remove*
>>>>> setParallelism(1) from the code, the output is deterministic and correct
>>>>> (i.e. windowStarted -> windowContents -> windowEnded).
>>>>>
>>>>> Clearly, something is wrong with my understanding. What is it?
>>>>>
>>>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Great to hear that you solved the problem. Let us know if you run
>>>>>> into any other issues.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> This problem is solved[1]. The issue was that the BroadcastStream
>>>>>>> did not contain any watermark, which prevented watermarks for any
>>>>>>> downstream operators from advancing.
>>>>>>> I appreciate all the help.
>>>>>>> [1]
>>>>>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas
>>>>>>>
>>>>>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rafi and Till,
>>>>>>>> Thank you for pointing out that edge case, Rafi.
>>>>>>>>
>>>>>>>> Till, I am trying to get this example working with the
>>>>>>>> BroadcastState pattern upstream to the window operator[1]. The problem 
>>>>>>>> is
>>>>>>>> that introducing the BroadcastState makes the onEventTime() *never*
>>>>>>>> fire. Is the BroadcastState somehow eating up the watermark? Do I need 
>>>>>>>> to
>>>>>>>> generate the watermark again in the KeyedBroadcastProcessFunction?
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Manas
>>>>>>>>
>>>>>>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Manas and Rafi,
>>>>>>>>>
>>>>>>>>> you are right that when using merging windows as event time
>>>>>>>>> session windows are, then Flink requires that any state the Trigger 
>>>>>>>>> keeps
>>>>>>>>> is of type MergingState. This constraint allows that the state can be
>>>>>>>>> merged whenever two windows get merged.
>>>>>>>>>
>>>>>>>>> Rafi, you are right. With the current implementation it might
>>>>>>>>> happen that you send a wrong started window message. I think it 
>>>>>>>>> depends on
>>>>>>>>> the MIN_WINDOW_SIZE and the distribution of your timestamps and, 
>>>>>>>>> hence,
>>>>>>>>> also your watermark. If you want to be on the safe side, then I would
>>>>>>>>> recommend to use the ProcessFunction to implement the required logic. 
>>>>>>>>> The
>>>>>>>>> ProcessFunction [1] is Flink's low level API and gives you access to 
>>>>>>>>> state
>>>>>>>>> and timers. In it, you would need to buffer the elements and to 
>>>>>>>>> sessionize
>>>>>>>>> them yourself, though. However, it would give you access to the
>>>>>>>>> watermark which in turn would allow you to properly handle your 
>>>>>>>>> described
>>>>>>>>> edge case.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I think one "edge" case which is not handled would be that the
>>>>>>>>>> first event (by event-time) arrives late, then a wrong 
>>>>>>>>>> "started-window"
>>>>>>>>>> would be reported.
>>>>>>>>>>
>>>>>>>>>> Rafi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <
>>>>>>>>>> manaskal...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is the reason ValueState cannot be use because session windows
>>>>>>>>>>> are always formed by merging proto-windows of single elements, 
>>>>>>>>>>> therefore a
>>>>>>>>>>> state store is needed that can handle merging. ValueState does not 
>>>>>>>>>>> provide
>>>>>>>>>>> this functionality, but a ReducingState does?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <
>>>>>>>>>>> manaskal...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>> Thanks for your answer! You also answered the next question
>>>>>>>>>>>> that I was about to ask "Can we share state between a Trigger and a
>>>>>>>>>>>> Window?" Currently the only (convoluted) way to share state 
>>>>>>>>>>>> between two
>>>>>>>>>>>> operators is through the broadcast state pattern, right?
>>>>>>>>>>>> Also, in your example, why can't we use a
>>>>>>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in 
>>>>>>>>>>>> my own
>>>>>>>>>>>> example but it  I am not able to  call the mergePartitionedState() 
>>>>>>>>>>>> method
>>>>>>>>>>>> on a ValueStateDescriptor.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Manas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <
>>>>>>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Manas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> you can implement something like this with a bit of trigger
>>>>>>>>>>>>> magic. What you need to do is to define your own trigger 
>>>>>>>>>>>>> implementation
>>>>>>>>>>>>> which keeps state to remember whether it has triggered the 
>>>>>>>>>>>>> "started window"
>>>>>>>>>>>>> message or not. In the stateful window function you would need to 
>>>>>>>>>>>>> do
>>>>>>>>>>>>> something similar. The first call could trigger the output of 
>>>>>>>>>>>>> "window
>>>>>>>>>>>>> started" and any subsequent call will trigger the evaluation of 
>>>>>>>>>>>>> the window.
>>>>>>>>>>>>> It would have been a bit easier if the trigger and the window 
>>>>>>>>>>>>> process
>>>>>>>>>>>>> function could share its internal state. Unfortunately, this is 
>>>>>>>>>>>>> not
>>>>>>>>>>>>> possible at the moment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <
>>>>>>>>>>>>> manaskal...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> I want to achieve the following using event time session
>>>>>>>>>>>>>> windows:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. When the window.getStart() and last event timestamp in
>>>>>>>>>>>>>>    the window is greater than MIN_WINDOW_SIZE milliseconds, I 
>>>>>>>>>>>>>> want to emit a
>>>>>>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>>>>>>    2. When the session window ends, i.e. the watermark
>>>>>>>>>>>>>>    passes lasteventTimestamp + inactivityPeriod, I want to emit 
>>>>>>>>>>>>>> a message
>>>>>>>>>>>>>>    "Window ended @ timestamp".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  It is guaranteed that all events are on time and no lateness
>>>>>>>>>>>>>> is allowed. I am having difficulty implementing both 1 and 2
>>>>>>>>>>>>>> simultaneously.
>>>>>>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > 
>>>>>>>>>>>>>> MIN_WINDOW_SIZE and
>>>>>>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is my approach correct or is there a completely different
>>>>>>>>>>>>>> method to achieve this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Manas Kale
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to