To me, it sounds strange. I would have expected it to work with `allowedLateness` and `sideOutput` being defined. I pull in David to have a look at it. Maybe, he has some more insights. I haven't worked that much with lateness, yet.
Matthias On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > << Added the Fliter upfront as below, the pipe has no issues. Also > metrics show that no data is being pushed through the sideoutput and that > data in not pulled from the a simulated sideout ( below ) > > >> Added the Fliter upfront as below, the pipe has no issues. Also > metrics show that no data is being pushed through the sideoutput and that > data in *now* pulled from the simulated sideout , essentially the Process > Function with a reverse predicate to the Filter Process Function. > > > On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> And when I added the filter the Exception was not thrown. So the sequence >> of events >> >> * Increased lateness from 12 ( that was what it was initially running >> with ) to 24 hours >> * the pipe ran as desired before it blew up with the Exception >> * masked the issue by increasing the lateness to 48 hours. >> * It blew up again but now after the added lateness, so essentially the >> same issue but added lateness let the pipe run for another few hours. >> * Added the Fliter upfront as below, the pipe has no issues. Also >> metrics show that no data is being pushed through the sideoutput and that >> data in not pulled from the a simulated sideout ( below ) >> >> >> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, >> VALUE>, KeyedTimedValue<KEY, VALUE>> { >> private static final long serialVersionUID = 1L; >> >> long allowedLateness; >> public LateEventFilter(long allowedLateness){ >> this.allowedLateness = allowedLateness; >> } >> @Override >> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx >> , >> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >> currentWatermark()) { >> out.collect(value); >> } >> } >> } >> >> >> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue< >> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> { >> private static final long serialVersionUID = 1L; >> >> long allowedLateness; >> public LateEventSideOutput(long allowedLateness){ >> this.allowedLateness = allowedLateness; >> } >> @Override >> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx >> , >> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >> if (ctx.timestamp() + allowedLateness <= ctx.timerService(). >> currentWatermark()) { >> out.collect(value); >> } >> } >> } >> >> >> >> I am using RocksDB as a backend if that helps. >> >> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Yes sir. The allowedLateNess and side output always existed. >>> >>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com> >>> wrote: >>> >>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as >>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput` >>>> being added to your pipeline when running into the >>>> UnsupportedOperationException issue previously? >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>> >>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> As in this is essentially doing what lateness *should* have done And >>>>> I think that is a bug. My code now is . Please look at the allowedLateness >>>>> on the session window. >>>>> >>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >>>>> filteredKeyedValue = keyedValue >>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( >>>>> "late_filter").uid("late_filter"); >>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue >>>>> = keyedValue >>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)). >>>>> name("late_data").uid("late_data"); >>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> >>>>> aggregate = filteredKeyedValue >>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null). >>>>> keyBy(value -> value.getKey()) >>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) >>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData( >>>>> lateOutputTag) >>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1))) >>>>> .aggregate(new SortAggregate<KEY, VALUE>(), >>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this >>>>> .lateNessInMinutes)) >>>>> .name("session_aggregate").uid("session_aggregate"); >>>>> >>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> I can do that, but I am not certain this is the right filter. Can >>>>>> you please validate. That aside I already have the lateness configured >>>>>> for >>>>>> the session window ( the normal withLateNess() ) and this looks like a >>>>>> session window was not collected and still is alive for some reason ( a >>>>>> flink bug ? ) >>>>>> >>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>>>>> currentWatermark()) { >>>>>> out.collect(value); >>>>>> } >>>>>> >>>>>> >>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> based on the error message and the behavior you described, >>>>>>> introducing a filter for late events is the way to go - just as >>>>>>> described >>>>>>> in the SO thread you mentioned. Usually, you would collect late events >>>>>>> in >>>>>>> some kind of side output [1]. >>>>>>> >>>>>>> I hope that helps. >>>>>>> Matthias >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>> >>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> I saw >>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>>>>>>> and this seems to suggest a straight up filter, but I am not sure how >>>>>>>> does >>>>>>>> that filter works as in would it factor is the lateness when filtering >>>>>>>> ? >>>>>>>> >>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Well it was not a solution after all. We now have a session window >>>>>>>>> that is stuck with the same issue albeit after the additional >>>>>>>>> lateness. I >>>>>>>>> had increased the lateness to 2 days and that masked the issue which >>>>>>>>> again >>>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the >>>>>>>>> 1 day >>>>>>>>> ) before. This is very disconcerting. >>>>>>>>> >>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end >>>>>>>>> timestamp of an event-time window cannot become earlier than the >>>>>>>>> current watermark by merging. Current watermark: 1619053742129 >>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402} >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hey folks, >>>>>>>>>> I had a pipe with sessionization restarts and then >>>>>>>>>> fail after retries with this exception. The only thing I had done >>>>>>>>>> was to >>>>>>>>>> increase the lateness by 12 hours ( to a day ) in this pipe and >>>>>>>>>> restart >>>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine >>>>>>>>>> that >>>>>>>>>> increasing the lateness created this and the way I solved this was to >>>>>>>>>> increase the lateness further. Could this be if there are TMs in the >>>>>>>>>> cluster whose time is off ( as in not synchronized ) ? >>>>>>>>>> >>>>>>>>>> 2021-04-21 11:27:58 >>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an >>>>>>>>>> event-time window cannot become earlier than the current watermark >>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow >>>>>>>>>> {start=1618878336107, end=1618880140466} >>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>>>>>>> OneInputStreamTask.java:191) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput >>>>>>>>>> .java:204) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>> .java:65) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .processInput(StreamTask.java:396) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .runMailboxLoop(StreamTask.java:617) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .invoke(StreamTask.java:581) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>>>> 755) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>> 570) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>