As in this is essentially doing what lateness *should* have done And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( "late_filter").uid("late_filter"); SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name( "late_data").uid("late_data"); SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy( value -> value.getKey()) .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData( lateOutputTag) .trigger(PurgingTrigger.of(CountTrigger.of(1))) .aggregate(new SortAggregate<KEY, VALUE>(), new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this. lateNessInMinutes)) .name("session_aggregate").uid("session_aggregate"); On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I can do that, but I am not certain this is the right filter. Can you > please validate. That aside I already have the lateness configured for the > session window ( the normal withLateNess() ) and this looks like a session > window was not collected and still is alive for some reason ( a flink bug ? > ) > > if (ctx.timestamp() + allowedLateness > ctx.timerService(). > currentWatermark()) { > out.collect(value); > } > > > On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Vishal, >> based on the error message and the behavior you described, introducing a >> filter for late events is the way to go - just as described in the SO >> thread you mentioned. Usually, you would collect late events in some kind >> of side output [1]. >> >> I hope that helps. >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >> >> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> I saw >>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>> and this seems to suggest a straight up filter, but I am not sure how does >>> that filter works as in would it factor is the lateness when filtering ? >>> >>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Well it was not a solution after all. We now have a session window that >>>> is stuck with the same issue albeit after the additional lateness. I had >>>> increased the lateness to 2 days and that masked the issue which again >>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day >>>> ) before. This is very disconcerting. >>>> >>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp >>>> of an event-time window cannot become earlier than the current >>>> watermark by merging. Current watermark: 1619053742129 window: >>>> TimeWindow{start=1618877773663, end=1618879580402} >>>> >>>> >>>> >>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Hey folks, >>>>> I had a pipe with sessionization restarts and then fail >>>>> after retries with this exception. The only thing I had done was to >>>>> increase the lateness by 12 hours ( to a day ) in this pipe and restart >>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that >>>>> increasing the lateness created this and the way I solved this was to >>>>> increase the lateness further. Could this be if there are TMs in the >>>>> cluster whose time is off ( as in not synchronized ) ? >>>>> >>>>> 2021-04-21 11:27:58 >>>>> java.lang.UnsupportedOperationException: The end timestamp of an >>>>> event-time window cannot become earlier than the current watermark by >>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start= >>>>> 1618878336107, end=1618880140466} >>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>> OneInputStreamTask.java:191) >>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>> .processElement(StreamTaskNetworkInput.java:204) >>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>> .emitNext(StreamTaskNetworkInput.java:174) >>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>> .processInput(StreamOneInputProcessor.java:65) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .processInput(StreamTask.java:396) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .runMailboxLoop(StreamTask.java:617) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:581) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> >>>>> >>