We are encountering the following error when running our Flink job. We have several processing windows, but it appears to be related to a TumblingProcessingTimeWindow. Checkpoints are failing to complete midway. The code block for the window is:
.keyBy(order -> getKey(order)) .window(TumblingProcessingTimeWindows.of(BATCH_SINK_WINDOW_DURATION)) .trigger(batchSinkTrigger) .apply( new WindowFunction<EnrichedOrder, Iterable<EnrichedOrder>, String, TimeWindow>() { @Override public void apply( String key, TimeWindow window, Iterable<EnrichedOrder> values, Collector<Iterable<EnrichedOrder>> out) { out.collect(values); } }) We also have another set of ProcessingTimeWindows but this one seems to be succeeding during checkpointing (completing through all partitions): keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(this.windowTimeGapMs))); It looks pretty basic, but we're getting the following error. 2022-04-15 12:56:33 java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1650052588226 window: TimeWindow{start=1650052587949, end=1650052588136} at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:338) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:312) at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:310) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) This seems like something that shouldn't happen with processing time windows. We did find this bug which appears similar to what we're experiencing. https://issues.apache.org/jira/browse/FLINK-12872 Thanks. Jai