Hi Tim,

The way session windows work is by first creating a new window for every
incoming event and then merging overlapping windows. That's why you see
that the end time of a window increases with every new incoming event. I
hope this explains what you are seeing. Apart from that, I think the
SessionTrigger looks good to me.

Cheers,
Till

On Fri, Apr 30, 2021 at 9:27 AM Tim Josefsson <tim.josefs...@webstep.se>
wrote:

> Thanks! I've managed to implement a working solution with the trigger API,
> but I'm not exactly sure why it works.
> I'm doing the following:
>
> DataStream<SessionSummary> summaries = env
>         .addSource(kafkaConsumer, "playerEvents(Kafka)")
>         .name("EP - Read player events from Kafka")
>         .uid("EP - Read player events from Kafka")
>         .map(json -> DECODER.decode(json, 
> TypeToken.of(HashMap.class))).returns(HashMap.class)
>         .name("EP - Map Json to HashMap")
>         .uid("EP - Map Json to HashMap")
>         .filter((FilterFunction<HashMap>) event -> 
> !(event.get(Field.SESSION_ID) == null))
>         .name("EP - Remove any events without sessionId since they shouldn't 
> generate sessions.")
>         .uid("EP - Remove any events without sessionId since they shouldn't 
> generate sessions.")
>         .filter((FilterFunction<HashMap>) event -> 
> event.get(Field.ACCOUNT_ID))
>         .keyBy((KeySelector<HashMap, String>) event -> (String) 
> event.get(Field.SESSION_ID))
>         
> .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
>         .trigger(new SessionTrigger())
>         .aggregate(new SummaryAggregator())
>         .name("EP - Aggregate events into session summaries")
>         .uid("EP - Aggregate events into session summaries");
>
> summaries.print();
>
> With the following trigger (omitting parts of the trigger):
>
> [ ... ]
> @Override
> public TriggerResult onElement(HashMap element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
>     ValueState<Boolean> firstSeen = ctx.getPartitionedState(
>             new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
>
>     // If an end event is detected, emit the content and purge
>     if(endSession.contains(element.get(Field.EVENT_TYPE))) {
>         return TriggerResult.FIRE_AND_PURGE;
>     }
>
>     if (firstSeen.value() == null) {
>         ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 
> 10000L);
>         ctx.registerProcessingTimeTimer(window.maxTimestamp());
>         firstSeen.update(true);
>     }
>     logger.info("Current window end is {} for session {}", 
> window.maxTimestamp(), element.get(Field.SESSION_ID));
>     return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
>     // Emit the current result every time the processing time trigger fires
>     if (time == window.maxTimestamp()) {
>         return TriggerResult.FIRE_AND_PURGE;
>     } else {
>         ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 
> 10000L);
>         return TriggerResult.FIRE;
>     }
> }
> [ ... ]
>
> So what I'm doing is setting the
> ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only
> set this once at the first event. But when testing it does work as I want
> and fires every ten seconds and the fires and purges only after no events
> have been received for 2 minutes (as specified in the SessionWindow). Is
> the processingTimeTimer being updated every time the window end time is
> increased (I noticed this happens in the background by Flink every time a
> new event arrives)?
>
> I'm happy with my solution, just trying to figure out how things work!
>
> Cheers,
> Tim
>
>
> On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <trohrm...@apache.org> wrote:
>
>> If you use the Trigger API, then you don't have to do anything special
>> for fault tolerance. When using the ProcessFunction, then you should use
>> Flink's state primitives to store your state (e.g. ValueState). This will
>> automatically checkpoint the state. In case of a failure Flink will always
>> resume from the latest successfully completed checkpoint.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <tim.josefs...@webstep.se>
>> wrote:
>>
>>> Thanks for the suggestions! I'll see if I can implement something that
>>> works!
>>> A followup question, more related to state. If I implement either the
>>> custom trigger with or the process function, how will they handle crashes
>>> and such. So if I for instance have a checkpointing interval of 10s will
>>> the job recover from the last checkpoint with all the summaries as they
>>> were at that point. Or do I have to implement specific ValueStates in both
>>> cases?
>>>
>>> On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> I think you could use Flink's trigger API [1] to implement a trigger
>>>> which fires when it sees a certain event or after some time.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <tim.josefs...@webstep.se>
>>>> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I'm trying to figure out how to implement a window that will emit
>>>>> events at regular intervals or when a specific event is encountered.
>>>>>
>>>>> A bit of background. I have a stream of events from devices that will
>>>>> send events to our system whenever a user watches a video. These events
>>>>> include a unique id (sessionId) shared by all events of the same same
>>>>> session so I want to key my stream on this. After that I want to aggregate
>>>>> all the events into a session summary and this summary I want to emit 
>>>>> every
>>>>> 5 minutes however I still want to keep the summary in the window (in case
>>>>> more events for that session arrives). However if I were to receive an end
>>>>> event (sent by the device once a user stops watching the video) I want to
>>>>> emit the summary and remove it from the window.
>>>>>
>>>>> Is it possible to do this with one of the existing windows together
>>>>> with a trigger or in some other way? Been trying to figure it out by
>>>>> reading the docs but haven't gotten any wiser so turning to the mailing
>>>>> list for help.
>>>>>
>>>>> Best regards,
>>>>> Tim
>>>>>
>>>>
>>>
>>> --
>>>
>>> *Tim Josefsson*
>>> [image: Webstep GPtW] <http://www.webstep.se/>
>>> mobil   +46 (0) 707 81 91 12
>>> telefon +46 (0) 8 21 40 70
>>>
>>> tim.josefs...@webstep.se
>>> *webstep.se <http://www.webstep.se/>*
>>> Suttungs gränd 2
>>> 753 19 Uppsala
>>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
>>> Bergen | Stavanger | Trondheim | Kristiansand
>>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
>>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
>>> <http://www.instagram.com/webstep_sverige>
>>>
>>
>
> --
>
> *Tim Josefsson*
> [image: Webstep GPtW] <http://www.webstep.se/>
> mobil   +46 (0) 707 81 91 12
> telefon +46 (0) 8 21 40 70
>
> tim.josefs...@webstep.se
> *webstep.se <http://www.webstep.se/>*
> Suttungs gränd 2
> 753 19 Uppsala
> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
> Bergen | Stavanger | Trondheim | Kristiansand
> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
> <http://www.instagram.com/webstep_sverige>
>

Reply via email to