After having talked to David about this issue offline, I decided to create
a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
mailing list, Vishal. Hopefully, the community has the chance to look into
it.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22425

On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <matth...@ververica.com>
wrote:

> To me, it sounds strange. I would have expected it to work with
> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
> look at it. Maybe, he has some more insights. I haven't worked that much
> with lateness, yet.
>
> Matthias
>
> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in *now* pulled from the simulated sideout , essentially the
>> Process Function with a reverse predicate to the Filter Process Function.
>>
>>
>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> And when I added the filter the Exception was not thrown. So the
>>> sequence of events
>>>
>>> * Increased lateness from 12 ( that was what it was initially running
>>> with )  to 24 hours
>>> * the pipe ran as desired before it blew up with the Exception
>>> * masked the issue by increasing the lateness to 48 hours.
>>> * It blew up again but now after the added lateness, so essentially the
>>> same issue but added lateness let the pipe run for another few hours.
>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>>
>>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
>>> VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventFilter(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>> ctx,
>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue
>>> <KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventSideOutput(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>> ctx,
>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>>
>>>  I am using RocksDB as a backend if that helps.
>>>
>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Yes sir. The allowedLateNess and side output always existed.
>>>>
>>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com>
>>>> wrote:
>>>>
>>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>>> being added to your pipeline when running into the
>>>>> UnsupportedOperationException issue previously?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>
>>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> As in this is essentially doing what lateness *should* have done
>>>>>> And I think that is a bug. My code now is . Please look at
>>>>>> the allowedLateness on the session window.
>>>>>>
>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>> filteredKeyedValue = keyedValue
>>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>>> "late_filter").uid("late_filter");
>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>> lateKeyedValue = keyedValue
>>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>>> name("late_data").uid("late_data");
>>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>>> aggregate = filteredKeyedValue
>>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>>>> keyBy(value -> value.getKey())
>>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>>>> lateOutputTag)
>>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes,
>>>>>> this.lateNessInMinutes))
>>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>>> you please validate. That aside I already have the lateness configured 
>>>>>>> for
>>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>>> flink bug ? )
>>>>>>>
>>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>>> currentWatermark()) {
>>>>>>> out.collect(value);
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <
>>>>>>> matth...@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>> based on the error message and the behavior you described,
>>>>>>>> introducing a filter for late events is the way to go - just as 
>>>>>>>> described
>>>>>>>> in the SO thread you mentioned. Usually, you would collect late events 
>>>>>>>> in
>>>>>>>> some kind of side output [1].
>>>>>>>>
>>>>>>>> I hope that helps.
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I saw
>>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how 
>>>>>>>>> does
>>>>>>>>> that filter works as in would it factor is the lateness when 
>>>>>>>>> filtering ?
>>>>>>>>>
>>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Well it was not a solution after all. We now have a session
>>>>>>>>>> window that is stuck with the same issue albeit  after the additional
>>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked the 
>>>>>>>>>> issue
>>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( 
>>>>>>>>>> instead
>>>>>>>>>> of the 1 day ) before. This is very disconcerting.
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey folks,
>>>>>>>>>>>                I had a pipe with sessionization restarts and
>>>>>>>>>>> then fail after retries with this exception. The only thing I had 
>>>>>>>>>>> done was
>>>>>>>>>>> to increase the lateness by 12 hours ( to  a day )  in this pipe and
>>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I cannot
>>>>>>>>>>> imagine that increasing the lateness created this and the way I 
>>>>>>>>>>> solved this
>>>>>>>>>>> was to increase the lateness further. Could this be if there are 
>>>>>>>>>>> TMs in the
>>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>>
>>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of
>>>>>>>>>>> an event-time window cannot become earlier than the current 
>>>>>>>>>>> watermark
>>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>>> .java:204)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>>> .java:65)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task
>>>>>>>>>>> .java:755)
>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>>> 570)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to