Hi Tony, Thank you for your answer, it definitely helps with understanding this situation. Is there any reliable way to split the stream so I get 2 outputs that avoids this behaviour? Eventually I want to have 2 sinks that output different data (one being just a copy of the stream, but organised in session windows and the other being metrics which I derive from the data itself).
Thanks, Tomasz On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote: > Hi Tomasz, > > I think this is because .window() is a lazy operator. > It just creates a WindowedStream class but not create a corresponding > operator. > The operator will be created after you called .reduce() and .apply(). > > rawEvents and metrics actually shared the same object to create their own > operators. > You can see the detail in WindowedStream.trigger() that it only set > this.trigger = trigger and then return iteself. > Because of this, when you used the same object to create operator for > rawEvents, it took the same settings for both WindowAssigner and Trigger as > well. > That's why you changed the order then the behavior changed as well. > > Hope this will help you. > > Regards, > Tony Wei > > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>: >> >> Hi, >> >> I'm working on a custom trigger that is supposed to trigger >> periodically and at the end of session window. These are the main >> methods from my trigger: >> >> public TriggerResult onElement(Object element, long timestamp, >> TimeWindow window, TriggerContext ctx) throws Exception { >> long currentTime = System.currentTimeMillis(); >> if (currentTime - lastTriggerTime >= this.delay) { >> lastTriggerTime = currentTime; >> return TriggerResult.FIRE; >> } else { >> return TriggerResult.CONTINUE; >> } >> } >> >> public TriggerResult onEventTime(long time, TimeWindow window, >> TriggerContext ctx) { >> return time == window.maxTimestamp() ? >> TriggerResult.FIRE : >> TriggerResult.CONTINUE; >> } >> >> When I use this trigger in my main processing method, I'm getting >> unexpected behaviour. This is how I use it: >> >> // MAIN PROCESSING >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream >> .map(new ParseEvent()) >> .filter(new Filter()) >> .assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) { >> @Override >> public long extractTimestamp(EventTags event) { >> return event.receivedAt; >> } >> }) >> .keyBy("streamKeys") >> .window(EventTimeSessionWindows.withGap(Time.minutes(5))); >> >> // WARNING! This has to go before periodic triggered metrics as Flink >> will trigger this as well >> // if it comes second >> DataStream<String> rawEvents = sessionWindow >> .reduce(new CollectRawData()) >> .map(new ParseRawData()); >> >> DataStream<String> metrics = sessionWindow >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> .apply(new ExtractMetrics()); >> >> >> This works as expected, rawEvents is calculated when the session >> window is completed and metrics is calculated periodically and at the >> windows end. But if I change the order of rawEvents and metrics (code >> should work the same in my mind), rawEvents is also triggered >> periodically. Is this expected to work this way? I'm not assigning >> periodic trigger to rawEvents. Thanks for your help. >> >> Kind Regards, >> Tomasz > >