Yes sir. The allowedLateNess and side output always existed.

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com>
wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> As in this is essentially doing what lateness *should* have done  And I
>> think that is a bug. My code now is . Please look at the allowedLateness on
>> the session window.
>>
>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
>> keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>> "late_data").uid("late_data");
>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate<KEY, VALUE>(),
>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
>> lateNessInMinutes))
>> .name("session_aggregate").uid("session_aggregate");
>>
>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I can do that, but I am not certain this is the right filter.  Can you
>>> please validate. That aside I already have the lateness configured for the
>>> session window ( the normal withLateNess() )  and this looks like a session
>>> window was not collected and still is alive for some reason ( a flink bug ?
>>> )
>>>
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>>
>>>
>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>> based on the error message and the behavior you described, introducing
>>>> a filter for late events is the way to go - just as described in the SO
>>>> thread you mentioned. Usually, you would collect late events in some kind
>>>> of side output [1].
>>>>
>>>> I hope that helps.
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>
>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> I saw
>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>
>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Well it was not a solution after all. We now have a session window
>>>>>> that is stuck with the same issue albeit  after the additional lateness. 
>>>>>> I
>>>>>> had increased the lateness to 2 days and that masked the issue which 
>>>>>> again
>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 
>>>>>> day
>>>>>> ) before. This is very disconcerting.
>>>>>>
>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hey folks,
>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and 
>>>>>>> restart
>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine 
>>>>>>> that
>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>
>>>>>>> 2021-04-21 11:27:58
>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>> OneInputStreamTask.java:191)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>> StreamTask.java:581)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755
>>>>>>> )
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>

Reply via email to