The only thing I can think of is to add the lateness configured to the filter as in here, as in the time on the element + lateness should always be greater then the current WM. As in the current issue is
Mon Apr 19 20:46:20 EDT 2021. Window end Wed Apr 21 21:09:02 EDT 2021, WM an event forced this merged window. And it is likely that it has the time of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125 Either ways the solution is yukky and not sure how it happened the first place ? public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> { private static final long serialVersionUID = 1L; long allowedLateness; public LateEventFilter(long allowedLateness){ this.allowedLateness = allowedLateness; } @Override public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx, Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) { out.collect(value); } } } On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I saw > https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. > and this seems to suggest a straight up filter, but I am not sure how does > that filter works as in would it factor is the lateness when filtering ? > > On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Well it was not a solution after all. We now have a session window that >> is stuck with the same issue albeit after the additional lateness. I had >> increased the lateness to 2 days and that masked the issue which again >> reared it's head after the 2 days ;lateness was over ( instead of the 1 day >> ) before. This is very disconcerting. >> >> Caused by: java.lang.UnsupportedOperationException: The end timestamp of >> an event-time window cannot become earlier than the current watermark by >> merging. Current watermark: 1619053742129 window: TimeWindow{start= >> 1618877773663, end=1618879580402} >> >> >> >> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Hey folks, >>> I had a pipe with sessionization restarts and then fail >>> after retries with this exception. The only thing I had done was to >>> increase the lateness by 12 hours ( to a day ) in this pipe and restart >>> from SP and it ran for 12 hours plus without issue. I cannot imagine that >>> increasing the lateness created this and the way I solved this was to >>> increase the lateness further. Could this be if there are TMs in the >>> cluster whose time is off ( as in not synchronized ) ? >>> >>> 2021-04-21 11:27:58 >>> java.lang.UnsupportedOperationException: The end timestamp of an >>> event-time window cannot become earlier than the current watermark by >>> merging. Current watermark: 1618966593999 window: TimeWindow{start= >>> 1618878336107, end=1618880140466} >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator$2.merge(WindowOperator.java:339) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator$2.merge(WindowOperator.java:321) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator.processElement(WindowOperator.java:319) >>> at org.apache.flink.streaming.runtime.tasks. >>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >>> .java:191) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .processElement(StreamTaskNetworkInput.java:204) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:174) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:65) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:396) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:191) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:617) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:581) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>>