I can do that, but I am not certain this is the right filter.  Can you
please validate. That aside I already have the lateness configured for the
session window ( the normal withLateNess() )  and this looks like a session
window was not collected and still is alive for some reason ( a flink bug ?
)

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing a
> filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Well it was not a solution after all. We now have a session window that
>>> is stuck with the same issue albeit  after the additional lateness. I had
>>> increased the lateness to 2 days and that masked the issue which again
>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>> ) before. This is very disconcerting.
>>>
>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>>> an event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>>> 1618877773663, end=1618879580402}
>>>
>>>
>>>
>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Hey folks,
>>>>                I had a pipe with sessionization restarts and then fail
>>>> after retries with this exception. The only thing I had done was to
>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>> increasing the lateness created this and the way I solved this was to
>>>> increase the lateness further. Could this be if there are TMs in the
>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>
>>>> 2021-04-21 11:27:58
>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>> event-time window cannot become earlier than the current watermark by
>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>> 1618878336107, end=1618880140466}
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>> OneInputStreamTask.java:191)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .processInput(StreamTask.java:396)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .runMailboxLoop(StreamTask.java:617)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:581)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>
>

Reply via email to