Hi,

I'm working on a custom trigger that is supposed to trigger
periodically and at the end of session window. These are the main
methods from my trigger:

public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
    long currentTime = System.currentTimeMillis();
    if (currentTime - lastTriggerTime >= this.delay) {
        lastTriggerTime = currentTime;
        return TriggerResult.FIRE;
    } else {
        return TriggerResult.CONTINUE;
    }
}

public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
    return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
}

When I use this trigger in my main processing method, I'm getting
unexpected behaviour. This is how I use it:

// MAIN PROCESSING
WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
              .map(new ParseEvent())
              .filter(new Filter())
              .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
                  @Override
                  public long extractTimestamp(EventTags event) {
                      return event.receivedAt;
                  }
              })
              .keyBy("streamKeys")
              .window(EventTimeSessionWindows.withGap(Time.minutes(5)));

// WARNING! This has to go before periodic triggered metrics as Flink
will trigger this as well
      // if it comes second
      DataStream<String> rawEvents = sessionWindow
              .reduce(new CollectRawData())
              .map(new ParseRawData());

DataStream<String> metrics = sessionWindow
              .trigger(SessionTrigger.every(Time.milliseconds(2)))
              .apply(new ExtractMetrics());


This works as expected, rawEvents is calculated when the session
window is completed and metrics is calculated periodically and at the
windows end. But if I change the order of rawEvents and metrics (code
should work the same in my mind), rawEvents is also triggered
periodically. Is this expected to work this way? I'm not assigning
periodic trigger to rawEvents. Thanks for your help.

Kind Regards,
Tomasz

Reply via email to