Here's our custom trigger. We thought about switching to ProcessingTimeoutTrigger.of(CountTrigger.of(100, Time.ofMinutes(1)). But I'm not sure that'll trigger properly when the window closes.
Thanks. Jai import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * A {@link Trigger} that fires once the current system time passes the end of the window to which a * pane belongs or the number of elements in the pane exceeds the {@code maxCount}. * * <p>Note, all elements in the window will be purged once this trigger is fired. */ // TODO: This class should be replaced by ProcessingTimeoutTrigger // once we have updated to flink 1.12.0. https://issues.apache.org/jira/browse/FLINK-17058 public class ProcessingTimeOrCountTrigger extends Trigger<Object, TimeWindow> { private final Trigger<Object, TimeWindow> countTrigger; private final Trigger<Object, TimeWindow> processingTimeTrigger; public ProcessingTimeOrCountTrigger(int maxCount) { this.countTrigger = PurgingTrigger.of(CountTrigger.of(maxCount)); this.processingTimeTrigger = PurgingTrigger.of(ProcessingTimeTrigger.create()); } @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { processingTimeTrigger.onElement(element, timestamp, window, ctx); return countTrigger.onElement(element, timestamp, window, ctx); } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { processingTimeTrigger.clear(window, ctx); countTrigger.clear(window, ctx); } } On Fri, Apr 15, 2022 at 2:57 PM Jai Patel <jai.pa...@cloudkitchens.com> wrote: > We are encountering the following error when running our Flink job. We > have several processing windows, but it appears to be related to a > TumblingProcessingTimeWindow. Checkpoints are failing to complete midway. > The code block for the window is: > > .keyBy(order -> getKey(order)) > > .window(TumblingProcessingTimeWindows.of(BATCH_SINK_WINDOW_DURATION)) > .trigger(batchSinkTrigger) > .apply( > new WindowFunction<EnrichedOrder, Iterable<EnrichedOrder>, > String, TimeWindow>() { > @Override > public void apply( > String key, > TimeWindow window, > Iterable<EnrichedOrder> values, > Collector<Iterable<EnrichedOrder>> out) { > out.collect(values); > } > }) > > We also have another set of ProcessingTimeWindows but this one seems to be > succeeding during checkpointing (completing through all partitions): > > > keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(this.windowTimeGapMs))); > > It looks pretty basic, but we're getting the following error. > > 2022-04-15 12:56:33 > java.lang.UnsupportedOperationException: The end timestamp of a > processing-time window cannot become earlier than the current processing > time by merging. Current processing time: 1650052588226 window: > TimeWindow{start=1650052587949, end=1650052588136} > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:338) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:312) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:748) > > This seems like something that shouldn't happen with processing time > windows. We did find this bug which appears similar to what we're > experiencing. https://issues.apache.org/jira/browse/FLINK-12872 > > Thanks. > Jai >