Hi Tomasz,

In my opinion, I would move .window() function down to these two
DataStream. (rawEvent.window().reduce().map(), and so does metrics)
It makes sure that they won't share the same constructor.

Regards,
Tony Wei

2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>:

> Hi Tony,
>
> Thank you for your answer, it definitely helps with understanding this
> situation.
> Is there any reliable way to split the stream so I get 2 outputs that
> avoids this behaviour? Eventually I want to have 2 sinks that output
> different data (one being just a copy of the stream, but organised in
> session windows and the other being metrics which I derive from the
> data itself).
>
> Thanks,
> Tomasz
>
> On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote:
> > Hi Tomasz,
> >
> > I think this is because .window() is a lazy operator.
> > It just creates a WindowedStream class but not create a corresponding
> > operator.
> > The operator will be created after you called .reduce() and .apply().
> >
> > rawEvents and metrics actually shared the same object to create their own
> > operators.
> > You can see the detail in WindowedStream.trigger() that it only set
> > this.trigger = trigger and then return iteself.
> > Because of this, when you used the same object to create operator for
> > rawEvents, it took the same settings for both WindowAssigner and Trigger
> as
> > well.
> > That's why you changed the order then the behavior changed as well.
> >
> > Hope this will help you.
> >
> > Regards,
> > Tony Wei
> >
> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com
> >:
> >>
> >> Hi,
> >>
> >> I'm working on a custom trigger that is supposed to trigger
> >> periodically and at the end of session window. These are the main
> >> methods from my trigger:
> >>
> >> public TriggerResult onElement(Object element, long timestamp,
> >> TimeWindow window, TriggerContext ctx) throws Exception {
> >>     long currentTime = System.currentTimeMillis();
> >>     if (currentTime - lastTriggerTime >= this.delay) {
> >>         lastTriggerTime = currentTime;
> >>         return TriggerResult.FIRE;
> >>     } else {
> >>         return TriggerResult.CONTINUE;
> >>     }
> >> }
> >>
> >> public TriggerResult onEventTime(long time, TimeWindow window,
> >> TriggerContext ctx) {
> >>     return time == window.maxTimestamp() ?
> >>             TriggerResult.FIRE :
> >>             TriggerResult.CONTINUE;
> >> }
> >>
> >> When I use this trigger in my main processing method, I'm getting
> >> unexpected behaviour. This is how I use it:
> >>
> >> // MAIN PROCESSING
> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
> >>               .map(new ParseEvent())
> >>               .filter(new Filter())
> >>               .assignTimestampsAndWatermarks(new
> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
> >>                   @Override
> >>                   public long extractTimestamp(EventTags event) {
> >>                       return event.receivedAt;
> >>                   }
> >>               })
> >>               .keyBy("streamKeys")
> >>               .window(EventTimeSessionWindows.
> withGap(Time.minutes(5)));
> >>
> >> // WARNING! This has to go before periodic triggered metrics as Flink
> >> will trigger this as well
> >>       // if it comes second
> >>       DataStream<String> rawEvents = sessionWindow
> >>               .reduce(new CollectRawData())
> >>               .map(new ParseRawData());
> >>
> >> DataStream<String> metrics = sessionWindow
> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
> >>               .apply(new ExtractMetrics());
> >>
> >>
> >> This works as expected, rawEvents is calculated when the session
> >> window is completed and metrics is calculated periodically and at the
> >> windows end. But if I change the order of rawEvents and metrics (code
> >> should work the same in my mind), rawEvents is also triggered
> >> periodically. Is this expected to work this way? I'm not assigning
> >> periodic trigger to rawEvents. Thanks for your help.
> >>
> >> Kind Regards,
> >> Tomasz
> >
> >
>

Reply via email to