Hi Tomasz, In my opinion, I would move .window() function down to these two DataStream. (rawEvent.window().reduce().map(), and so does metrics) It makes sure that they won't share the same constructor.
Regards, Tony Wei 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>: > Hi Tony, > > Thank you for your answer, it definitely helps with understanding this > situation. > Is there any reliable way to split the stream so I get 2 outputs that > avoids this behaviour? Eventually I want to have 2 sinks that output > different data (one being just a copy of the stream, but organised in > session windows and the other being metrics which I derive from the > data itself). > > Thanks, > Tomasz > > On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote: > > Hi Tomasz, > > > > I think this is because .window() is a lazy operator. > > It just creates a WindowedStream class but not create a corresponding > > operator. > > The operator will be created after you called .reduce() and .apply(). > > > > rawEvents and metrics actually shared the same object to create their own > > operators. > > You can see the detail in WindowedStream.trigger() that it only set > > this.trigger = trigger and then return iteself. > > Because of this, when you used the same object to create operator for > > rawEvents, it took the same settings for both WindowAssigner and Trigger > as > > well. > > That's why you changed the order then the behavior changed as well. > > > > Hope this will help you. > > > > Regards, > > Tony Wei > > > > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com > >: > >> > >> Hi, > >> > >> I'm working on a custom trigger that is supposed to trigger > >> periodically and at the end of session window. These are the main > >> methods from my trigger: > >> > >> public TriggerResult onElement(Object element, long timestamp, > >> TimeWindow window, TriggerContext ctx) throws Exception { > >> long currentTime = System.currentTimeMillis(); > >> if (currentTime - lastTriggerTime >= this.delay) { > >> lastTriggerTime = currentTime; > >> return TriggerResult.FIRE; > >> } else { > >> return TriggerResult.CONTINUE; > >> } > >> } > >> > >> public TriggerResult onEventTime(long time, TimeWindow window, > >> TriggerContext ctx) { > >> return time == window.maxTimestamp() ? > >> TriggerResult.FIRE : > >> TriggerResult.CONTINUE; > >> } > >> > >> When I use this trigger in my main processing method, I'm getting > >> unexpected behaviour. This is how I use it: > >> > >> // MAIN PROCESSING > >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream > >> .map(new ParseEvent()) > >> .filter(new Filter()) > >> .assignTimestampsAndWatermarks(new > >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) { > >> @Override > >> public long extractTimestamp(EventTags event) { > >> return event.receivedAt; > >> } > >> }) > >> .keyBy("streamKeys") > >> .window(EventTimeSessionWindows. > withGap(Time.minutes(5))); > >> > >> // WARNING! This has to go before periodic triggered metrics as Flink > >> will trigger this as well > >> // if it comes second > >> DataStream<String> rawEvents = sessionWindow > >> .reduce(new CollectRawData()) > >> .map(new ParseRawData()); > >> > >> DataStream<String> metrics = sessionWindow > >> .trigger(SessionTrigger.every(Time.milliseconds(2))) > >> .apply(new ExtractMetrics()); > >> > >> > >> This works as expected, rawEvents is calculated when the session > >> window is completed and metrics is calculated periodically and at the > >> windows end. But if I change the order of rawEvents and metrics (code > >> should work the same in my mind), rawEvents is also triggered > >> periodically. Is this expected to work this way? I'm not assigning > >> periodic trigger to rawEvents. Thanks for your help. > >> > >> Kind Regards, > >> Tomasz > > > > >