The only thing I can think of is to add the lateness configured to the
filter as in here, as in the time on the element + lateness should always
be greater then the current WM. As in the current issue is



Mon Apr 19 20:46:20 EDT 2021.  Window end

Wed Apr 21 21:09:02 EDT 2021,  WM


an event forced this merged window. And it is likely that it has the time
of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit
https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125


Either ways the solution is yukky and not sure how it happened the first
place ?


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}

On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 1618877773663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>                I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>

Reply via email to