Hi, I'm trying to migrate a KafkaStreams application to Flink (using DataStream API). The application consumes a high traffic (millions of events per second) Kafka topic and collects events into sessions keyed by id. To reduce the load on subsequent processing steps I want to output one event on session start and one event on session end. So, I set up a pipeline which keys the stream by id, aggregates the events over a event time session window with a gap of 4 seconds. I also implemented a custom trigger to trigger when the first event arrives in a window.
When I run this pipeline I somtimes observe that I get multiple calls to the aggregator's "createAccumulator" method for a given session id, and therefore I also get duplicate session start and session end events for the session id. So it looks to me that the Flink is collecting the events into multiple sessions even if they have the same session id. Examples: Input events: Event timestamp Id 2022-09-06 08:00:00 ABC 2022-09-06 08:00:01 ABC 2022-09-06 08:00:02 ABC 2022-09-06 08:00:03 ABC 2022-09-06 08:00:04 ABC 2022-09-06 08:00:05 ABC Problem 1: Output events: Event time Id Type 2022-09-06 08:00:00 ABC Start 2022-09-06 08:00:03 ABC End 2022-09-06 08:00:04 ABC Start 2022-09-06 08:00:05 ABC End Problem 2: Output events: Event time Id Type 2022-09-06 08:00:00 ABC Start 2022-09-06 08:00:03 ABC Start 2022-09-06 08:00:04 ABC End 2022-09-06 08:00:05 ABC End Expected output: Event time Id Type 2022-09-06 08:00:00 ABC Start 2022-09-06 08:00:05 ABC End Is this expected behaviour? How can I avoid getting duplicate session windows? Thanks for your help Kristinn