Hi Tony,

Thank you for your answer, it definitely helps with understanding this
situation.
Is there any reliable way to split the stream so I get 2 outputs that
avoids this behaviour? Eventually I want to have 2 sinks that output
different data (one being just a copy of the stream, but organised in
session windows and the other being metrics which I derive from the
data itself).

Thanks,
Tomasz

On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote:
> Hi Tomasz,
>
> I think this is because .window() is a lazy operator.
> It just creates a WindowedStream class but not create a corresponding
> operator.
> The operator will be created after you called .reduce() and .apply().
>
> rawEvents and metrics actually shared the same object to create their own
> operators.
> You can see the detail in WindowedStream.trigger() that it only set
> this.trigger = trigger and then return iteself.
> Because of this, when you used the same object to create operator for
> rawEvents, it took the same settings for both WindowAssigner and Trigger as
> well.
> That's why you changed the order then the behavior changed as well.
>
> Hope this will help you.
>
> Regards,
> Tony Wei
>
> 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>:
>>
>> Hi,
>>
>> I'm working on a custom trigger that is supposed to trigger
>> periodically and at the end of session window. These are the main
>> methods from my trigger:
>>
>> public TriggerResult onElement(Object element, long timestamp,
>> TimeWindow window, TriggerContext ctx) throws Exception {
>>     long currentTime = System.currentTimeMillis();
>>     if (currentTime - lastTriggerTime >= this.delay) {
>>         lastTriggerTime = currentTime;
>>         return TriggerResult.FIRE;
>>     } else {
>>         return TriggerResult.CONTINUE;
>>     }
>> }
>>
>> public TriggerResult onEventTime(long time, TimeWindow window,
>> TriggerContext ctx) {
>>     return time == window.maxTimestamp() ?
>>             TriggerResult.FIRE :
>>             TriggerResult.CONTINUE;
>> }
>>
>> When I use this trigger in my main processing method, I'm getting
>> unexpected behaviour. This is how I use it:
>>
>> // MAIN PROCESSING
>> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>>               .map(new ParseEvent())
>>               .filter(new Filter())
>>               .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
>>                   @Override
>>                   public long extractTimestamp(EventTags event) {
>>                       return event.receivedAt;
>>                   }
>>               })
>>               .keyBy("streamKeys")
>>               .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>>
>> // WARNING! This has to go before periodic triggered metrics as Flink
>> will trigger this as well
>>       // if it comes second
>>       DataStream<String> rawEvents = sessionWindow
>>               .reduce(new CollectRawData())
>>               .map(new ParseRawData());
>>
>> DataStream<String> metrics = sessionWindow
>>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>>               .apply(new ExtractMetrics());
>>
>>
>> This works as expected, rawEvents is calculated when the session
>> window is completed and metrics is calculated periodically and at the
>> windows end. But if I change the order of rawEvents and metrics (code
>> should work the same in my mind), rawEvents is also triggered
>> periodically. Is this expected to work this way? I'm not assigning
>> periodic trigger to rawEvents. Thanks for your help.
>>
>> Kind Regards,
>> Tomasz
>
>

Reply via email to