Hi Tony, Thank you for this thorough explanation. Helps a lot!
Kind Regards, Tomasz On 23 August 2017 at 11:30, Tony Wei <tony19920...@gmail.com> wrote: > Hi Tomasz, > > Actually, window is not a real operator shared by your operators created by > reduce() and apply() function. > Flink implemented WindowOperator by binding window(), trigger() and > evictor() as well with the WindowFunction. > It is more like the prior operator sent elements to two following operators > and they created their own window state by themselves. > For more details, you can refer to this blog > (https://flink.apache.org/news/2015/12/04/Introducing-windows.html) > > Therefore, the modified version of mine is not different from yours. > > Regards, > Tony Wei > > 2017-08-23 18:11 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>: >> >> Hi Tony, >> >> Won't that increase the amount of processing Flink has to do? It would >> have to window twice, right? >> >> Thanks, >> Tomasz >> >> On 23 August 2017 at 11:02, Tony Wei <tony19920...@gmail.com> wrote: >> > Hi Tomasz, >> > >> > In my opinion, I would move .window() function down to these two >> > DataStream. >> > (rawEvent.window().reduce().map(), and so does metrics) >> > It makes sure that they won't share the same constructor. >> > >> > Regards, >> > Tony Wei >> > >> > 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki >> > <dobrzycki.tom...@gmail.com>: >> >> >> >> Hi Tony, >> >> >> >> Thank you for your answer, it definitely helps with understanding this >> >> situation. >> >> Is there any reliable way to split the stream so I get 2 outputs that >> >> avoids this behaviour? Eventually I want to have 2 sinks that output >> >> different data (one being just a copy of the stream, but organised in >> >> session windows and the other being metrics which I derive from the >> >> data itself). >> >> >> >> Thanks, >> >> Tomasz >> >> >> >> On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote: >> >> > Hi Tomasz, >> >> > >> >> > I think this is because .window() is a lazy operator. >> >> > It just creates a WindowedStream class but not create a corresponding >> >> > operator. >> >> > The operator will be created after you called .reduce() and .apply(). >> >> > >> >> > rawEvents and metrics actually shared the same object to create their >> >> > own >> >> > operators. >> >> > You can see the detail in WindowedStream.trigger() that it only set >> >> > this.trigger = trigger and then return iteself. >> >> > Because of this, when you used the same object to create operator for >> >> > rawEvents, it took the same settings for both WindowAssigner and >> >> > Trigger >> >> > as >> >> > well. >> >> > That's why you changed the order then the behavior changed as well. >> >> > >> >> > Hope this will help you. >> >> > >> >> > Regards, >> >> > Tony Wei >> >> > >> >> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki >> >> > <dobrzycki.tom...@gmail.com>: >> >> >> >> >> >> Hi, >> >> >> >> >> >> I'm working on a custom trigger that is supposed to trigger >> >> >> periodically and at the end of session window. These are the main >> >> >> methods from my trigger: >> >> >> >> >> >> public TriggerResult onElement(Object element, long timestamp, >> >> >> TimeWindow window, TriggerContext ctx) throws Exception { >> >> >> long currentTime = System.currentTimeMillis(); >> >> >> if (currentTime - lastTriggerTime >= this.delay) { >> >> >> lastTriggerTime = currentTime; >> >> >> return TriggerResult.FIRE; >> >> >> } else { >> >> >> return TriggerResult.CONTINUE; >> >> >> } >> >> >> } >> >> >> >> >> >> public TriggerResult onEventTime(long time, TimeWindow window, >> >> >> TriggerContext ctx) { >> >> >> return time == window.maxTimestamp() ? >> >> >> TriggerResult.FIRE : >> >> >> TriggerResult.CONTINUE; >> >> >> } >> >> >> >> >> >> When I use this trigger in my main processing method, I'm getting >> >> >> unexpected behaviour. This is how I use it: >> >> >> >> >> >> // MAIN PROCESSING >> >> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = >> >> >> dataStream >> >> >> .map(new ParseEvent()) >> >> >> .filter(new Filter()) >> >> >> .assignTimestampsAndWatermarks(new >> >> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) >> >> >> { >> >> >> @Override >> >> >> public long extractTimestamp(EventTags event) { >> >> >> return event.receivedAt; >> >> >> } >> >> >> }) >> >> >> .keyBy("streamKeys") >> >> >> >> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5))); >> >> >> >> >> >> // WARNING! This has to go before periodic triggered metrics as >> >> >> Flink >> >> >> will trigger this as well >> >> >> // if it comes second >> >> >> DataStream<String> rawEvents = sessionWindow >> >> >> .reduce(new CollectRawData()) >> >> >> .map(new ParseRawData()); >> >> >> >> >> >> DataStream<String> metrics = sessionWindow >> >> >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> >> >> .apply(new ExtractMetrics()); >> >> >> >> >> >> >> >> >> This works as expected, rawEvents is calculated when the session >> >> >> window is completed and metrics is calculated periodically and at >> >> >> the >> >> >> windows end. But if I change the order of rawEvents and metrics >> >> >> (code >> >> >> should work the same in my mind), rawEvents is also triggered >> >> >> periodically. Is this expected to work this way? I'm not assigning >> >> >> periodic trigger to rawEvents. Thanks for your help. >> >> >> >> >> >> Kind Regards, >> >> >> Tomasz >> >> > >> >> > >> > >> > > >