After having talked to David about this issue offline, I decided to create a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the mailing list, Vishal. Hopefully, the community has the chance to look into it.
Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-22425 On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <matth...@ververica.com> wrote: > To me, it sounds strange. I would have expected it to work with > `allowedLateness` and `sideOutput` being defined. I pull in David to have a > look at it. Maybe, he has some more insights. I haven't worked that much > with lateness, yet. > > Matthias > > On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> << Added the Fliter upfront as below, the pipe has no issues. Also >> metrics show that no data is being pushed through the sideoutput and that >> data in not pulled from the a simulated sideout ( below ) >> >> >> Added the Fliter upfront as below, the pipe has no issues. Also >> metrics show that no data is being pushed through the sideoutput and that >> data in *now* pulled from the simulated sideout , essentially the >> Process Function with a reverse predicate to the Filter Process Function. >> >> >> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> And when I added the filter the Exception was not thrown. So the >>> sequence of events >>> >>> * Increased lateness from 12 ( that was what it was initially running >>> with ) to 24 hours >>> * the pipe ran as desired before it blew up with the Exception >>> * masked the issue by increasing the lateness to 48 hours. >>> * It blew up again but now after the added lateness, so essentially the >>> same issue but added lateness let the pipe run for another few hours. >>> * Added the Fliter upfront as below, the pipe has no issues. Also >>> metrics show that no data is being pushed through the sideoutput and that >>> data in not pulled from the a simulated sideout ( below ) >>> >>> >>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, >>> VALUE>, KeyedTimedValue<KEY, VALUE>> { >>> private static final long serialVersionUID = 1L; >>> >>> long allowedLateness; >>> public LateEventFilter(long allowedLateness){ >>> this.allowedLateness = allowedLateness; >>> } >>> @Override >>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context >>> ctx, >>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>> currentWatermark()) { >>> out.collect(value); >>> } >>> } >>> } >>> >>> >>> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue >>> <KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> { >>> private static final long serialVersionUID = 1L; >>> >>> long allowedLateness; >>> public LateEventSideOutput(long allowedLateness){ >>> this.allowedLateness = allowedLateness; >>> } >>> @Override >>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context >>> ctx, >>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >>> if (ctx.timestamp() + allowedLateness <= ctx.timerService(). >>> currentWatermark()) { >>> out.collect(value); >>> } >>> } >>> } >>> >>> >>> >>> I am using RocksDB as a backend if that helps. >>> >>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Yes sir. The allowedLateNess and side output always existed. >>>> >>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com> >>>> wrote: >>>> >>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as >>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput` >>>>> being added to your pipeline when running into the >>>>> UnsupportedOperationException issue previously? >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>> >>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> As in this is essentially doing what lateness *should* have done >>>>>> And I think that is a bug. My code now is . Please look at >>>>>> the allowedLateness on the session window. >>>>>> >>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >>>>>> filteredKeyedValue = keyedValue >>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( >>>>>> "late_filter").uid("late_filter"); >>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >>>>>> lateKeyedValue = keyedValue >>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)). >>>>>> name("late_data").uid("late_data"); >>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> >>>>>> aggregate = filteredKeyedValue >>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null). >>>>>> keyBy(value -> value.getKey()) >>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) >>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData( >>>>>> lateOutputTag) >>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1))) >>>>>> .aggregate(new SortAggregate<KEY, VALUE>(), >>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, >>>>>> this.lateNessInMinutes)) >>>>>> .name("session_aggregate").uid("session_aggregate"); >>>>>> >>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> I can do that, but I am not certain this is the right filter. Can >>>>>>> you please validate. That aside I already have the lateness configured >>>>>>> for >>>>>>> the session window ( the normal withLateNess() ) and this looks like a >>>>>>> session window was not collected and still is alive for some reason ( a >>>>>>> flink bug ? ) >>>>>>> >>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>>>>>> currentWatermark()) { >>>>>>> out.collect(value); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl < >>>>>>> matth...@ververica.com> wrote: >>>>>>> >>>>>>>> Hi Vishal, >>>>>>>> based on the error message and the behavior you described, >>>>>>>> introducing a filter for late events is the way to go - just as >>>>>>>> described >>>>>>>> in the SO thread you mentioned. Usually, you would collect late events >>>>>>>> in >>>>>>>> some kind of side output [1]. >>>>>>>> >>>>>>>> I hope that helps. >>>>>>>> Matthias >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>>> >>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I saw >>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how >>>>>>>>> does >>>>>>>>> that filter works as in would it factor is the lateness when >>>>>>>>> filtering ? >>>>>>>>> >>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Well it was not a solution after all. We now have a session >>>>>>>>>> window that is stuck with the same issue albeit after the additional >>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked the >>>>>>>>>> issue >>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( >>>>>>>>>> instead >>>>>>>>>> of the 1 day ) before. This is very disconcerting. >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end >>>>>>>>>> timestamp of an event-time window cannot become earlier than the >>>>>>>>>> current watermark by merging. Current watermark: 1619053742129 >>>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402} >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey folks, >>>>>>>>>>> I had a pipe with sessionization restarts and >>>>>>>>>>> then fail after retries with this exception. The only thing I had >>>>>>>>>>> done was >>>>>>>>>>> to increase the lateness by 12 hours ( to a day ) in this pipe and >>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I cannot >>>>>>>>>>> imagine that increasing the lateness created this and the way I >>>>>>>>>>> solved this >>>>>>>>>>> was to increase the lateness further. Could this be if there are >>>>>>>>>>> TMs in the >>>>>>>>>>> cluster whose time is off ( as in not synchronized ) ? >>>>>>>>>>> >>>>>>>>>>> 2021-04-21 11:27:58 >>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of >>>>>>>>>>> an event-time window cannot become earlier than the current >>>>>>>>>>> watermark >>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow >>>>>>>>>>> {start=1618878336107, end=1618880140466} >>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>>>>>>>> OneInputStreamTask.java:191) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput >>>>>>>>>>> .java:204) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>>> .java:65) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .processInput(StreamTask.java:396) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .runMailboxLoop(StreamTask.java:617) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .invoke(StreamTask.java:581) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task >>>>>>>>>>> .java:755) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>>> 570) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>