Hi Tony,

Thank you for this thorough explanation. Helps a lot!

Kind Regards,
Tomasz

On 23 August 2017 at 11:30, Tony Wei <tony19920...@gmail.com> wrote:
> Hi Tomasz,
>
> Actually, window is not a real operator shared by your operators created by
> reduce() and apply() function.
> Flink implemented WindowOperator by binding window(), trigger() and
> evictor() as well with the WindowFunction.
> It is more like the prior operator sent elements to two following operators
> and they created their own window state by themselves.
> For more details, you can refer to this blog
> (https://flink.apache.org/news/2015/12/04/Introducing-windows.html)
>
> Therefore, the modified version of mine is not different from yours.
>
> Regards,
> Tony Wei
>
> 2017-08-23 18:11 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>:
>>
>> Hi Tony,
>>
>> Won't that increase the amount of processing Flink has to do? It would
>> have to window twice, right?
>>
>> Thanks,
>> Tomasz
>>
>> On 23 August 2017 at 11:02, Tony Wei <tony19920...@gmail.com> wrote:
>> > Hi Tomasz,
>> >
>> > In my opinion, I would move .window() function down to these two
>> > DataStream.
>> > (rawEvent.window().reduce().map(), and so does metrics)
>> > It makes sure that they won't share the same constructor.
>> >
>> > Regards,
>> > Tony Wei
>> >
>> > 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki
>> > <dobrzycki.tom...@gmail.com>:
>> >>
>> >> Hi Tony,
>> >>
>> >> Thank you for your answer, it definitely helps with understanding this
>> >> situation.
>> >> Is there any reliable way to split the stream so I get 2 outputs that
>> >> avoids this behaviour? Eventually I want to have 2 sinks that output
>> >> different data (one being just a copy of the stream, but organised in
>> >> session windows and the other being metrics which I derive from the
>> >> data itself).
>> >>
>> >> Thanks,
>> >> Tomasz
>> >>
>> >> On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote:
>> >> > Hi Tomasz,
>> >> >
>> >> > I think this is because .window() is a lazy operator.
>> >> > It just creates a WindowedStream class but not create a corresponding
>> >> > operator.
>> >> > The operator will be created after you called .reduce() and .apply().
>> >> >
>> >> > rawEvents and metrics actually shared the same object to create their
>> >> > own
>> >> > operators.
>> >> > You can see the detail in WindowedStream.trigger() that it only set
>> >> > this.trigger = trigger and then return iteself.
>> >> > Because of this, when you used the same object to create operator for
>> >> > rawEvents, it took the same settings for both WindowAssigner and
>> >> > Trigger
>> >> > as
>> >> > well.
>> >> > That's why you changed the order then the behavior changed as well.
>> >> >
>> >> > Hope this will help you.
>> >> >
>> >> > Regards,
>> >> > Tony Wei
>> >> >
>> >> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki
>> >> > <dobrzycki.tom...@gmail.com>:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I'm working on a custom trigger that is supposed to trigger
>> >> >> periodically and at the end of session window. These are the main
>> >> >> methods from my trigger:
>> >> >>
>> >> >> public TriggerResult onElement(Object element, long timestamp,
>> >> >> TimeWindow window, TriggerContext ctx) throws Exception {
>> >> >>     long currentTime = System.currentTimeMillis();
>> >> >>     if (currentTime - lastTriggerTime >= this.delay) {
>> >> >>         lastTriggerTime = currentTime;
>> >> >>         return TriggerResult.FIRE;
>> >> >>     } else {
>> >> >>         return TriggerResult.CONTINUE;
>> >> >>     }
>> >> >> }
>> >> >>
>> >> >> public TriggerResult onEventTime(long time, TimeWindow window,
>> >> >> TriggerContext ctx) {
>> >> >>     return time == window.maxTimestamp() ?
>> >> >>             TriggerResult.FIRE :
>> >> >>             TriggerResult.CONTINUE;
>> >> >> }
>> >> >>
>> >> >> When I use this trigger in my main processing method, I'm getting
>> >> >> unexpected behaviour. This is how I use it:
>> >> >>
>> >> >> // MAIN PROCESSING
>> >> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow =
>> >> >> dataStream
>> >> >>               .map(new ParseEvent())
>> >> >>               .filter(new Filter())
>> >> >>               .assignTimestampsAndWatermarks(new
>> >> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5))
>> >> >> {
>> >> >>                   @Override
>> >> >>                   public long extractTimestamp(EventTags event) {
>> >> >>                       return event.receivedAt;
>> >> >>                   }
>> >> >>               })
>> >> >>               .keyBy("streamKeys")
>> >> >>
>> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>> >> >>
>> >> >> // WARNING! This has to go before periodic triggered metrics as
>> >> >> Flink
>> >> >> will trigger this as well
>> >> >>       // if it comes second
>> >> >>       DataStream<String> rawEvents = sessionWindow
>> >> >>               .reduce(new CollectRawData())
>> >> >>               .map(new ParseRawData());
>> >> >>
>> >> >> DataStream<String> metrics = sessionWindow
>> >> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>> >> >>               .apply(new ExtractMetrics());
>> >> >>
>> >> >>
>> >> >> This works as expected, rawEvents is calculated when the session
>> >> >> window is completed and metrics is calculated periodically and at
>> >> >> the
>> >> >> windows end. But if I change the order of rawEvents and metrics
>> >> >> (code
>> >> >> should work the same in my mind), rawEvents is also triggered
>> >> >> periodically. Is this expected to work this way? I'm not assigning
>> >> >> periodic trigger to rawEvents. Thanks for your help.
>> >> >>
>> >> >> Kind Regards,
>> >> >> Tomasz
>> >> >
>> >> >
>> >
>> >
>
>

Reply via email to