Yes sir. The allowedLateNess and side output always existed. On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com> wrote:
> You're saying that you used `allowedLateness`/`sideOutputLateData` as > described in [1] but without the `LateEventFilter`/`LateEventSideOutput` > being added to your pipeline when running into the > UnsupportedOperationException issue previously? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output > > On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> As in this is essentially doing what lateness *should* have done And I >> think that is a bug. My code now is . Please look at the allowedLateness on >> the session window. >> >> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >> filteredKeyedValue = keyedValue >> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( >> "late_filter").uid("late_filter"); >> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = >> keyedValue >> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name( >> "late_data").uid("late_data"); >> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> >> aggregate = filteredKeyedValue >> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null). >> keyBy(value -> value.getKey()) >> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) >> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData( >> lateOutputTag) >> .trigger(PurgingTrigger.of(CountTrigger.of(1))) >> .aggregate(new SortAggregate<KEY, VALUE>(), >> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this. >> lateNessInMinutes)) >> .name("session_aggregate").uid("session_aggregate"); >> >> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> I can do that, but I am not certain this is the right filter. Can you >>> please validate. That aside I already have the lateness configured for the >>> session window ( the normal withLateNess() ) and this looks like a session >>> window was not collected and still is alive for some reason ( a flink bug ? >>> ) >>> >>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>> currentWatermark()) { >>> out.collect(value); >>> } >>> >>> >>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com> >>> wrote: >>> >>>> Hi Vishal, >>>> based on the error message and the behavior you described, introducing >>>> a filter for late events is the way to go - just as described in the SO >>>> thread you mentioned. Usually, you would collect late events in some kind >>>> of side output [1]. >>>> >>>> I hope that helps. >>>> Matthias >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>> >>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> I saw >>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>>>> and this seems to suggest a straight up filter, but I am not sure how does >>>>> that filter works as in would it factor is the lateness when filtering ? >>>>> >>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Well it was not a solution after all. We now have a session window >>>>>> that is stuck with the same issue albeit after the additional lateness. >>>>>> I >>>>>> had increased the lateness to 2 days and that masked the issue which >>>>>> again >>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 >>>>>> day >>>>>> ) before. This is very disconcerting. >>>>>> >>>>>> Caused by: java.lang.UnsupportedOperationException: The end >>>>>> timestamp of an event-time window cannot become earlier than the >>>>>> current watermark by merging. Current watermark: 1619053742129 >>>>>> window: TimeWindow{start=1618877773663, end=1618879580402} >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> Hey folks, >>>>>>> I had a pipe with sessionization restarts and then >>>>>>> fail after retries with this exception. The only thing I had done was to >>>>>>> increase the lateness by 12 hours ( to a day ) in this pipe and >>>>>>> restart >>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine >>>>>>> that >>>>>>> increasing the lateness created this and the way I solved this was to >>>>>>> increase the lateness further. Could this be if there are TMs in the >>>>>>> cluster whose time is off ( as in not synchronized ) ? >>>>>>> >>>>>>> 2021-04-21 11:27:58 >>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an >>>>>>> event-time window cannot become earlier than the current watermark >>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow >>>>>>> {start=1618878336107, end=1618880140466} >>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>>>> OneInputStreamTask.java:191) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>> .processElement(StreamTaskNetworkInput.java:204) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>> .emitNext(StreamTaskNetworkInput.java:174) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>>> .processInput(StreamOneInputProcessor.java:65) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .processInput(StreamTask.java:396) >>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .runMailboxLoop(StreamTask.java:617) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>> StreamTask.java:581) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755 >>>>>>> ) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> >>>>>>> >>>> > > -- > > Matthias Pohl | Engineer > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton > Wehner >