[ https://issues.apache.org/jira/browse/FLINK-25007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ori Popowski updated FLINK-25007: --------------------------------- Description: I am creating a simple application with events firing every 15 seconds. I created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th event, it should return 1 millisecond. I expected that after the 4th event, a session window will trigger, but it's not what happens. In reality the session window never triggers, even though after the 4th event, the session gap is effectively 1 millisecond and the interval between events is 15 seconds. {code:java} object Main { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val now = Instant.now() senv .addSource(new Source(now)) .assignAscendingTimestamps(_.time.toEpochMilli) .keyBy(_ => 1) .window(DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[Element] { override def extract(element: Element): Long = { if (element.sessionEnd) 1 else 90.minutes.toMillis } })) .process(new ProcessWindowFunction[Element, Vector[Element], Int, TimeWindow] { override def process(k: Int, context: Context, elements: Iterable[Element], out: Collector[Vector[Element]]): Unit = { out.collect(elements.toVector) } }) .print() senv.execute() } } case class Element(id: Int, time: Instant, sessionEnd: Boolean = false) class Source(now: Instant) extends RichSourceFunction[Element] { @volatile private var isRunning = true private var totalInterval = 0L private var i = 0 override def run(ctx: SourceFunction.SourceContext[Element]): Unit = { while (isRunning) { val element = Element(i, now.plusMillis(totalInterval)) if (i >= 4) ctx.collect(element.copy(sessionEnd = true)) else ctx.collect(element) i += 1 totalInterval += 15.seconds.toMillis Thread.sleep(15.seconds.toMillis) } } override def cancel(): Unit = { isRunning = false } }{code} was: I am creating a simple application with events firing every 15 seconds. I created a {{ SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th event, it should return 1 millisecond. I expected that after the 4th event, a session window will trigger, but it's not what happens. In reality the session window never triggers, even though after the 4th event, the session gap is effectively 1 millisecond and the interval between events is 15 seconds. {code:java} object Main { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val now = Instant.now() senv .addSource(new Source(now)) .assignAscendingTimestamps(_.time.toEpochMilli) .keyBy(_ => 1) .window(DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[Element] { override def extract(element: Element): Long = { if (element.sessionEnd) 1 else 90.minutes.toMillis } })) .process(new ProcessWindowFunction[Element, Vector[Element], Int, TimeWindow] { override def process(k: Int, context: Context, elements: Iterable[Element], out: Collector[Vector[Element]]): Unit = { out.collect(elements.toVector) } }) .print() senv.execute() } } case class Element(id: Int, time: Instant, sessionEnd: Boolean = false) class Source(now: Instant) extends RichSourceFunction[Element] { @volatile private var isRunning = true private var totalInterval = 0L private var i = 0 override def run(ctx: SourceFunction.SourceContext[Element]): Unit = { while (isRunning) { val element = Element(i, now.plusMillis(totalInterval)) if (i >= 4) ctx.collect(element.copy(sessionEnd = true)) else ctx.collect(element) i += 1 totalInterval += 15.seconds.toMillis Thread.sleep(15.seconds.toMillis) } } override def cancel(): Unit = { isRunning = false } }{code} > Session window with dynamic gap doesn't work > -------------------------------------------- > > Key: FLINK-25007 > URL: https://issues.apache.org/jira/browse/FLINK-25007 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.12.0 > Environment: Local environment > Reporter: Ori Popowski > Priority: Major > > I am creating a simple application with events firing every 15 seconds. I > created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but > after the 4th event, it should return 1 millisecond. I expected that after > the 4th event, a session window will trigger, but it's not what happens. In > reality the session window never triggers, even though after the 4th event, > the session gap is effectively 1 millisecond and the interval between events > is 15 seconds. > > {code:java} > object Main { > def main(args: Array[String]): Unit = { > val senv = StreamExecutionEnvironment.getExecutionEnvironment > senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val now = Instant.now() > senv > .addSource(new Source(now)) > .assignAscendingTimestamps(_.time.toEpochMilli) > .keyBy(_ => 1) > .window(DynamicEventTimeSessionWindows.withDynamicGap(new > SessionWindowTimeGapExtractor[Element] { > override def extract(element: Element): Long = { > if (element.sessionEnd) 1 > else 90.minutes.toMillis > } > })) > .process(new ProcessWindowFunction[Element, Vector[Element], Int, > TimeWindow] { > override def process(k: Int, context: Context, elements: > Iterable[Element], out: Collector[Vector[Element]]): Unit = { > out.collect(elements.toVector) > } > }) > .print() > senv.execute() > } > } > case class Element(id: Int, time: Instant, sessionEnd: Boolean = false) > class Source(now: Instant) extends RichSourceFunction[Element] { > @volatile private var isRunning = true > private var totalInterval = 0L > private var i = 0 > override def run(ctx: SourceFunction.SourceContext[Element]): Unit = { > while (isRunning) { > val element = Element(i, now.plusMillis(totalInterval)) > if (i >= 4) ctx.collect(element.copy(sessionEnd = true)) > else ctx.collect(element) > i += 1 > totalInterval += 15.seconds.toMillis > Thread.sleep(15.seconds.toMillis) > } > } > override def cancel(): Unit = { > isRunning = false > } > }{code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)