Here's our custom trigger.  We thought about switching to
ProcessingTimeoutTrigger.of(CountTrigger.of(100, Time.ofMinutes(1)).  But
I'm not sure that'll trigger properly when the window closes.

Thanks.
Jai

import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * A {@link Trigger} that fires once the current system time passes the end
of the window to which a
 * pane belongs or the number of elements in the pane exceeds the {@code
maxCount}.
 *
 * <p>Note, all elements in the window will be purged once this trigger is
fired.
 */
// TODO: This class should be replaced by ProcessingTimeoutTrigger
//  once we have updated to flink 1.12.0.
https://issues.apache.org/jira/browse/FLINK-17058
public class ProcessingTimeOrCountTrigger extends Trigger<Object,
TimeWindow> {

  private final Trigger<Object, TimeWindow> countTrigger;
  private final Trigger<Object, TimeWindow> processingTimeTrigger;

  public ProcessingTimeOrCountTrigger(int maxCount) {
    this.countTrigger = PurgingTrigger.of(CountTrigger.of(maxCount));
    this.processingTimeTrigger =
PurgingTrigger.of(ProcessingTimeTrigger.create());
  }

  @Override
  public TriggerResult onElement(
      Object element, long timestamp, TimeWindow window, TriggerContext
ctx) throws Exception {
    processingTimeTrigger.onElement(element, timestamp, window, ctx);
    return countTrigger.onElement(element, timestamp, window, ctx);
  }

  @Override
  public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) {
    return TriggerResult.FIRE_AND_PURGE;
  }

  @Override
  public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
{
    processingTimeTrigger.clear(window, ctx);
    countTrigger.clear(window, ctx);
  }
}

On Fri, Apr 15, 2022 at 2:57 PM Jai Patel <jai.pa...@cloudkitchens.com>
wrote:

> We are encountering the following error when running our Flink job.  We
> have several processing windows, but it appears to be related to a
> TumblingProcessingTimeWindow.  Checkpoints are failing to complete midway.
> The code block for the window is:
>
>         .keyBy(order -> getKey(order))
>
> .window(TumblingProcessingTimeWindows.of(BATCH_SINK_WINDOW_DURATION))
>         .trigger(batchSinkTrigger)
>         .apply(
>             new WindowFunction<EnrichedOrder, Iterable<EnrichedOrder>,
> String, TimeWindow>() {
>               @Override
>               public void apply(
>                   String key,
>                   TimeWindow window,
>                   Iterable<EnrichedOrder> values,
>                   Collector<Iterable<EnrichedOrder>> out) {
>                 out.collect(values);
>               }
>             })
>
> We also have another set of ProcessingTimeWindows but this one seems to be
> succeeding during checkpointing (completing through all partitions):
>
>
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(this.windowTimeGapMs)));
>
> It looks pretty basic, but we're getting the following error.
>
> 2022-04-15 12:56:33
> java.lang.UnsupportedOperationException: The end timestamp of a
> processing-time window cannot become earlier than the current processing
> time by merging. Current processing time: 1650052588226 window:
> TimeWindow{start=1650052587949, end=1650052588136}
>     at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:338)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:312)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:310)
>     at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>     at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>     at java.lang.Thread.run(Thread.java:748)
>
> This seems like something that shouldn't happen with processing time
> windows.  We did find this bug which appears similar to what we're
> experiencing.  https://issues.apache.org/jira/browse/FLINK-12872
>
> Thanks.
> Jai
>

Reply via email to