I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.
But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.
This is my job:
val stream = senv
.addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer,
properties))
.filter(_.sessionId.nonEmpty)
.flatMap(_ match { case (_, events) => events })
.assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
})
.keyBy(_.sessionId)
.window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
.process(myProcessWindowFunction)
AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)
Any idea why it's happening?