I can do that, but I am not certain this is the right filter. Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() ) and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? )
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) { out.collect(value); } On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com> wrote: > Hi Vishal, > based on the error message and the behavior you described, introducing a > filter for late events is the way to go - just as described in the SO > thread you mentioned. Usually, you would collect late events in some kind > of side output [1]. > > I hope that helps. > Matthias > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output > > On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> I saw >> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >> and this seems to suggest a straight up filter, but I am not sure how does >> that filter works as in would it factor is the lateness when filtering ? >> >> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Well it was not a solution after all. We now have a session window that >>> is stuck with the same issue albeit after the additional lateness. I had >>> increased the lateness to 2 days and that masked the issue which again >>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day >>> ) before. This is very disconcerting. >>> >>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of >>> an event-time window cannot become earlier than the current watermark by >>> merging. Current watermark: 1619053742129 window: TimeWindow{start= >>> 1618877773663, end=1618879580402} >>> >>> >>> >>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Hey folks, >>>> I had a pipe with sessionization restarts and then fail >>>> after retries with this exception. The only thing I had done was to >>>> increase the lateness by 12 hours ( to a day ) in this pipe and restart >>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that >>>> increasing the lateness created this and the way I solved this was to >>>> increase the lateness further. Could this be if there are TMs in the >>>> cluster whose time is off ( as in not synchronized ) ? >>>> >>>> 2021-04-21 11:27:58 >>>> java.lang.UnsupportedOperationException: The end timestamp of an >>>> event-time window cannot become earlier than the current watermark by >>>> merging. Current watermark: 1618966593999 window: TimeWindow{start= >>>> 1618878336107, end=1618880140466} >>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>> WindowOperator$2.merge(WindowOperator.java:339) >>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>> WindowOperator$2.merge(WindowOperator.java:321) >>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>> WindowOperator.processElement(WindowOperator.java:319) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>> OneInputStreamTask.java:191) >>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>> .processElement(StreamTaskNetworkInput.java:204) >>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>> .emitNext(StreamTaskNetworkInput.java:174) >>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>> .processInput(StreamOneInputProcessor.java:65) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .processInput(StreamTask.java:396) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .runMailboxLoop(StreamTask.java:617) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:581) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> >>>> >