We are encountering the following error when running our Flink job.  We
have several processing windows, but it appears to be related to a
TumblingProcessingTimeWindow.  Checkpoints are failing to complete midway.
The code block for the window is:

        .keyBy(order -> getKey(order))

.window(TumblingProcessingTimeWindows.of(BATCH_SINK_WINDOW_DURATION))
        .trigger(batchSinkTrigger)
        .apply(
            new WindowFunction<EnrichedOrder, Iterable<EnrichedOrder>,
String, TimeWindow>() {
              @Override
              public void apply(
                  String key,
                  TimeWindow window,
                  Iterable<EnrichedOrder> values,
                  Collector<Iterable<EnrichedOrder>> out) {
                out.collect(values);
              }
            })

We also have another set of ProcessingTimeWindows but this one seems to be
succeeding during checkpointing (completing through all partitions):


keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(this.windowTimeGapMs)));

It looks pretty basic, but we're getting the following error.

2022-04-15 12:56:33
java.lang.UnsupportedOperationException: The end timestamp of a
processing-time window cannot become earlier than the current processing
time by merging. Current processing time: 1650052588226 window:
TimeWindow{start=1650052587949, end=1650052588136}
    at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:338)
    at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:312)
    at
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:310)
    at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)

This seems like something that shouldn't happen with processing time
windows.  We did find this bug which appears similar to what we're
experiencing.  https://issues.apache.org/jira/browse/FLINK-12872

Thanks.
Jai

Reply via email to