As in this is essentially doing what lateness *should* have done  And I
think that is a bug. My code now is . Please look at the allowedLateness on
the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue
= keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
"late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
"late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate
= filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(
value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a session
> window was not collected and still is alive for some reason ( a flink bug ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described, introducing a
>> filter for late events is the way to go - just as described in the SO
>> thread you mentioned. Usually, you would collect late events in some kind
>> of side output [1].
>>
>> I hope that helps.
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I saw
>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>> and this seems to suggest a straight up filter, but I am not sure how does
>>> that filter works as in would it factor is the lateness when filtering ?
>>>
>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Well it was not a solution after all. We now have a session window that
>>>> is stuck with the same issue albeit  after the additional lateness. I had
>>>> increased the lateness to 2 days and that masked the issue which again
>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>> ) before. This is very disconcerting.
>>>>
>>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp
>>>> of an event-time window cannot become earlier than the current
>>>> watermark by merging. Current watermark: 1619053742129 window:
>>>> TimeWindow{start=1618877773663, end=1618879580402}
>>>>
>>>>
>>>>
>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Hey folks,
>>>>>                I had a pipe with sessionization restarts and then fail
>>>>> after retries with this exception. The only thing I had done was to
>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>> increasing the lateness created this and the way I solved this was to
>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>
>>>>> 2021-04-21 11:27:58
>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>> event-time window cannot become earlier than the current watermark by
>>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>>> 1618878336107, end=1618880140466}
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>> OneInputStreamTask.java:191)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .processInput(StreamTask.java:396)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:581)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>>
>>

Reply via email to