I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a gap of 30 minutes.
But as soon as I start the job, events are written to the sink (I can see them in S3) even though 30 minutes have not passed. This is my job: val stream = senv .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer, properties)) .filter(_.sessionId.nonEmpty) .flatMap(_ match { case (_, events) => events }) .assignTimestampsAndWatermarks(new TimestampExtractor[Event](Time.minutes(10)) { override def extractTimestamp(element: Event): Long = event.sequence / 1000 // microseconds }) .keyBy(_.sessionId) .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES))) .process(myProcessWindowFunction) AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100) Any idea why it's happening?