Hi Tomasz,

I think this is because .window() is a lazy operator.
It just creates a WindowedStream class but not create a corresponding
operator.
The operator will be created after you called .reduce() and .apply().

rawEvents and metrics actually shared the same object to create their own
operators.
You can see the detail in WindowedStream.trigger() that it only set
this.trigger = trigger and then return iteself.
Because of this, when you used the same object to create operator for
rawEvents, it took the same settings for both WindowAssigner and Trigger as
well.
That's why you changed the order then the behavior changed as well.

Hope this will help you.

Regards,
Tony Wei

2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>:

> Hi,
>
> I'm working on a custom trigger that is supposed to trigger
> periodically and at the end of session window. These are the main
> methods from my trigger:
>
> public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>     long currentTime = System.currentTimeMillis();
>     if (currentTime - lastTriggerTime >= this.delay) {
>         lastTriggerTime = currentTime;
>         return TriggerResult.FIRE;
>     } else {
>         return TriggerResult.CONTINUE;
>     }
> }
>
> public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>     return time == window.maxTimestamp() ?
>             TriggerResult.FIRE :
>             TriggerResult.CONTINUE;
> }
>
> When I use this trigger in my main processing method, I'm getting
> unexpected behaviour. This is how I use it:
>
> // MAIN PROCESSING
> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>               .map(new ParseEvent())
>               .filter(new Filter())
>               .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
>                   @Override
>                   public long extractTimestamp(EventTags event) {
>                       return event.receivedAt;
>                   }
>               })
>               .keyBy("streamKeys")
>               .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>
> // WARNING! This has to go before periodic triggered metrics as Flink
> will trigger this as well
>       // if it comes second
>       DataStream<String> rawEvents = sessionWindow
>               .reduce(new CollectRawData())
>               .map(new ParseRawData());
>
> DataStream<String> metrics = sessionWindow
>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>               .apply(new ExtractMetrics());
>
>
> This works as expected, rawEvents is calculated when the session
> window is completed and metrics is calculated periodically and at the
> windows end. But if I change the order of rawEvents and metrics (code
> should work the same in my mind), rawEvents is also triggered
> periodically. Is this expected to work this way? I'm not assigning
> periodic trigger to rawEvents. Thanks for your help.
>
> Kind Regards,
> Tomasz
>

Reply via email to