Ori Popowski created FLINK-25007: ------------------------------------ Summary: Session window with dynamic gap doesn't work Key: FLINK-25007 URL: https://issues.apache.org/jira/browse/FLINK-25007 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.12.0 Environment: Local environment Reporter: Ori Popowski
I am creating a simple application with events firing every 15 seconds. I created a {{ SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th event, it should return 1 millisecond. I expected that after the 4th event, a session window will trigger, but it's not what happens. In reality the session window never triggers, even though after the 4th event, the session gap is effectively 1 millisecond and the interval between events is 15 seconds. {code:java} object Main { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val now = Instant.now() senv .addSource(new Source(now)) .assignAscendingTimestamps(_.time.toEpochMilli) .keyBy(_ => 1) .window(DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[Element] { override def extract(element: Element): Long = { if (element.sessionEnd) 1 else 90.minutes.toMillis } })) .process(new ProcessWindowFunction[Element, Vector[Element], Int, TimeWindow] { override def process(k: Int, context: Context, elements: Iterable[Element], out: Collector[Vector[Element]]): Unit = { out.collect(elements.toVector) } }) .print() senv.execute() } } case class Element(id: Int, time: Instant, sessionEnd: Boolean = false) class Source(now: Instant) extends RichSourceFunction[Element] { @volatile private var isRunning = true private var totalInterval = 0L private var i = 0 override def run(ctx: SourceFunction.SourceContext[Element]): Unit = { while (isRunning) { val element = Element(i, now.plusMillis(totalInterval)) if (i >= 4) ctx.collect(element.copy(sessionEnd = true)) else ctx.collect(element) i += 1 totalInterval += 15.seconds.toMillis Thread.sleep(15.seconds.toMillis) } } override def cancel(): Unit = { isRunning = false } }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)