Hi Tim, The way session windows work is by first creating a new window for every incoming event and then merging overlapping windows. That's why you see that the end time of a window increases with every new incoming event. I hope this explains what you are seeing. Apart from that, I think the SessionTrigger looks good to me.
Cheers, Till On Fri, Apr 30, 2021 at 9:27 AM Tim Josefsson <tim.josefs...@webstep.se> wrote: > Thanks! I've managed to implement a working solution with the trigger API, > but I'm not exactly sure why it works. > I'm doing the following: > > DataStream<SessionSummary> summaries = env > .addSource(kafkaConsumer, "playerEvents(Kafka)") > .name("EP - Read player events from Kafka") > .uid("EP - Read player events from Kafka") > .map(json -> DECODER.decode(json, > TypeToken.of(HashMap.class))).returns(HashMap.class) > .name("EP - Map Json to HashMap") > .uid("EP - Map Json to HashMap") > .filter((FilterFunction<HashMap>) event -> > !(event.get(Field.SESSION_ID) == null)) > .name("EP - Remove any events without sessionId since they shouldn't > generate sessions.") > .uid("EP - Remove any events without sessionId since they shouldn't > generate sessions.") > .filter((FilterFunction<HashMap>) event -> > event.get(Field.ACCOUNT_ID)) > .keyBy((KeySelector<HashMap, String>) event -> (String) > event.get(Field.SESSION_ID)) > > .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2))) > .trigger(new SessionTrigger()) > .aggregate(new SummaryAggregator()) > .name("EP - Aggregate events into session summaries") > .uid("EP - Aggregate events into session summaries"); > > summaries.print(); > > With the following trigger (omitting parts of the trigger): > > [ ... ] > @Override > public TriggerResult onElement(HashMap element, long timestamp, TimeWindow > window, TriggerContext ctx) throws Exception { > ValueState<Boolean> firstSeen = ctx.getPartitionedState( > new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN)); > > // If an end event is detected, emit the content and purge > if(endSession.contains(element.get(Field.EVENT_TYPE))) { > return TriggerResult.FIRE_AND_PURGE; > } > > if (firstSeen.value() == null) { > ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + > 10000L); > ctx.registerProcessingTimeTimer(window.maxTimestamp()); > firstSeen.update(true); > } > logger.info("Current window end is {} for session {}", > window.maxTimestamp(), element.get(Field.SESSION_ID)); > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > // Emit the current result every time the processing time trigger fires > if (time == window.maxTimestamp()) { > return TriggerResult.FIRE_AND_PURGE; > } else { > ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + > 10000L); > return TriggerResult.FIRE; > } > } > [ ... ] > > So what I'm doing is setting the > ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only > set this once at the first event. But when testing it does work as I want > and fires every ten seconds and the fires and purges only after no events > have been received for 2 minutes (as specified in the SessionWindow). Is > the processingTimeTimer being updated every time the window end time is > increased (I noticed this happens in the background by Flink every time a > new event arrives)? > > I'm happy with my solution, just trying to figure out how things work! > > Cheers, > Tim > > > On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <trohrm...@apache.org> wrote: > >> If you use the Trigger API, then you don't have to do anything special >> for fault tolerance. When using the ProcessFunction, then you should use >> Flink's state primitives to store your state (e.g. ValueState). This will >> automatically checkpoint the state. In case of a failure Flink will always >> resume from the latest successfully completed checkpoint. >> >> Cheers, >> Till >> >> On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <tim.josefs...@webstep.se> >> wrote: >> >>> Thanks for the suggestions! I'll see if I can implement something that >>> works! >>> A followup question, more related to state. If I implement either the >>> custom trigger with or the process function, how will they handle crashes >>> and such. So if I for instance have a checkpointing interval of 10s will >>> the job recover from the last checkpoint with all the summaries as they >>> were at that point. Or do I have to implement specific ValueStates in both >>> cases? >>> >>> On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Tim, >>>> >>>> I think you could use Flink's trigger API [1] to implement a trigger >>>> which fires when it sees a certain event or after some time. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <tim.josefs...@webstep.se> >>>> wrote: >>>> >>>>> Hello! >>>>> >>>>> I'm trying to figure out how to implement a window that will emit >>>>> events at regular intervals or when a specific event is encountered. >>>>> >>>>> A bit of background. I have a stream of events from devices that will >>>>> send events to our system whenever a user watches a video. These events >>>>> include a unique id (sessionId) shared by all events of the same same >>>>> session so I want to key my stream on this. After that I want to aggregate >>>>> all the events into a session summary and this summary I want to emit >>>>> every >>>>> 5 minutes however I still want to keep the summary in the window (in case >>>>> more events for that session arrives). However if I were to receive an end >>>>> event (sent by the device once a user stops watching the video) I want to >>>>> emit the summary and remove it from the window. >>>>> >>>>> Is it possible to do this with one of the existing windows together >>>>> with a trigger or in some other way? Been trying to figure it out by >>>>> reading the docs but haven't gotten any wiser so turning to the mailing >>>>> list for help. >>>>> >>>>> Best regards, >>>>> Tim >>>>> >>>> >>> >>> -- >>> >>> *Tim Josefsson* >>> [image: Webstep GPtW] <http://www.webstep.se/> >>> mobil +46 (0) 707 81 91 12 >>> telefon +46 (0) 8 21 40 70 >>> >>> tim.josefs...@webstep.se >>> *webstep.se <http://www.webstep.se/>* >>> Suttungs gränd 2 >>> 753 19 Uppsala >>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo >>> Bergen | Stavanger | Trondheim | Kristiansand >>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image: >>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook] >>> <http://www.instagram.com/webstep_sverige> >>> >> > > -- > > *Tim Josefsson* > [image: Webstep GPtW] <http://www.webstep.se/> > mobil +46 (0) 707 81 91 12 > telefon +46 (0) 8 21 40 70 > > tim.josefs...@webstep.se > *webstep.se <http://www.webstep.se/>* > Suttungs gränd 2 > 753 19 Uppsala > Stockholm | Uppsala | Malmö | Sundsvall | Oslo > Bergen | Stavanger | Trondheim | Kristiansand > [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image: > Facebook] <http://www.facebook.com/webstepAB> [image: Facebook] > <http://www.instagram.com/webstep_sverige> >