Great to hear that you solved the problem. Let us know if you run into any
other issues.

Cheers,
Till

On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com> wrote:

> Hi,
> This problem is solved[1]. The issue was that the BroadcastStream did not
> contain any watermark, which prevented watermarks for any downstream
> operators from advancing.
> I appreciate all the help.
> [1]
> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>
> Thanks,
> Manas
>
> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com> wrote:
>
>> Hi Rafi and Till,
>> Thank you for pointing out that edge case, Rafi.
>>
>> Till, I am trying to get this example working with the BroadcastState
>> pattern upstream to the window operator[1]. The problem is that introducing
>> the BroadcastState makes the onEventTime() *never* fire. Is the
>> BroadcastState somehow eating up the watermark? Do I need to generate the
>> watermark again in the KeyedBroadcastProcessFunction?
>>
>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>
>> Thanks,
>> Manas
>>
>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Manas and Rafi,
>>>
>>> you are right that when using merging windows as event time session
>>> windows are, then Flink requires that any state the Trigger keeps is of
>>> type MergingState. This constraint allows that the state can be merged
>>> whenever two windows get merged.
>>>
>>> Rafi, you are right. With the current implementation it might happen
>>> that you send a wrong started window message. I think it depends on the
>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>> your watermark. If you want to be on the safe side, then I would recommend
>>> to use the ProcessFunction to implement the required logic. The
>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>> and timers. In it, you would need to buffer the elements and to sessionize
>>> them yourself, though. However, it would give you access to the
>>> watermark which in turn would allow you to properly handle your described
>>> edge case.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>
>>> Cheers,
>>> Till
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com>
>>> wrote:
>>>
>>>> I think one "edge" case which is not handled would be that the first
>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>> reported.
>>>>
>>>> Rafi
>>>>
>>>>
>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com>
>>>> wrote:
>>>>
>>>>> Is the reason ValueState cannot be use because session windows are
>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>> state store is needed that can handle merging. ValueState does not provide
>>>>> this functionality, but a ReducingState does?
>>>>>
>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>> Currently the only (convoluted) way to share state between two operators 
>>>>>> is
>>>>>> through the broadcast state pattern, right?
>>>>>> Also, in your example, why can't we use a
>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>> example but it  I am not able to  call the mergePartitionedState() method
>>>>>> on a ValueStateDescriptor.
>>>>>>
>>>>>> Regards,
>>>>>> Manas
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Manas,
>>>>>>>
>>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>>> What you need to do is to define your own trigger implementation which
>>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>> started" and any subsequent call will trigger the evaluation of the 
>>>>>>> window.
>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>> possible at the moment.
>>>>>>>
>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>
>>>>>>> [1]
>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>
>>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit 
>>>>>>>> a
>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message 
>>>>>>>> "Window
>>>>>>>>    ended @ timestamp".
>>>>>>>>
>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 
>>>>>>>> simultaneously.
>>>>>>>> I am able to implement point 1 using a custom trigger, which checks
>>>>>>>> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and 
>>>>>>>> triggers
>>>>>>>> a customProcessWindowFunction().
>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>> window.
>>>>>>>>
>>>>>>>> Is my approach correct or is there a completely different method to
>>>>>>>> achieve this?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Manas Kale
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to