[ 
https://issues.apache.org/jira/browse/FLINK-25007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ori Popowski updated FLINK-25007:
---------------------------------
    Description: 
I am creating a simple application with events firing every 15 seconds. I 
created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but after 
the 4th event, it should return 1 millisecond. I expected that after the 4th 
event, a session window will trigger, but it's not what happens. In reality the 
session window never triggers, even though after the 4th event, the session gap 
is effectively 1 millisecond and the interval between events is 15 seconds.

 
{code:java}
object Main {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val now = Instant.now()

    senv
      .addSource(new Source(now))
      .assignAscendingTimestamps(_.time.toEpochMilli)
      .keyBy(_ => 1)
      .window(DynamicEventTimeSessionWindows.withDynamicGap(new 
SessionWindowTimeGapExtractor[Element] {
        override def extract(element: Element): Long = {
          if (element.sessionEnd) 1
          else 90.minutes.toMillis
        }
      }))
      .process(new ProcessWindowFunction[Element, Vector[Element], Int, 
TimeWindow] {
        override def process(k: Int, context: Context, elements: 
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
          out.collect(elements.toVector)
        }
      })
      .print()

    senv.execute()
  }
}

case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)

class Source(now: Instant) extends RichSourceFunction[Element] {
  @volatile private var isRunning = true
  private var totalInterval = 0L
  private var i = 0

  override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
    while (isRunning) {
      val element = Element(i, now.plusMillis(totalInterval))

      if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
      else ctx.collect(element)

      i += 1
      totalInterval += 15.seconds.toMillis
      Thread.sleep(15.seconds.toMillis)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}{code}
 

 

  was:
I am creating a simple application with events firing every 15 seconds. I 
created a {{

SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th 
event, it should return 1 millisecond. I expected that after the 4th event, a 
session window will trigger, but it's not what happens. In reality the session 
window never triggers, even though after the 4th event, the session gap is 
effectively 1 millisecond and the interval between events is 15 seconds.

 
{code:java}
object Main {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val now = Instant.now()

    senv
      .addSource(new Source(now))
      .assignAscendingTimestamps(_.time.toEpochMilli)
      .keyBy(_ => 1)
      .window(DynamicEventTimeSessionWindows.withDynamicGap(new 
SessionWindowTimeGapExtractor[Element] {
        override def extract(element: Element): Long = {
          if (element.sessionEnd) 1
          else 90.minutes.toMillis
        }
      }))
      .process(new ProcessWindowFunction[Element, Vector[Element], Int, 
TimeWindow] {
        override def process(k: Int, context: Context, elements: 
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
          out.collect(elements.toVector)
        }
      })
      .print()

    senv.execute()
  }
}

case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)

class Source(now: Instant) extends RichSourceFunction[Element] {
  @volatile private var isRunning = true
  private var totalInterval = 0L
  private var i = 0

  override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
    while (isRunning) {
      val element = Element(i, now.plusMillis(totalInterval))

      if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
      else ctx.collect(element)

      i += 1
      totalInterval += 15.seconds.toMillis
      Thread.sleep(15.seconds.toMillis)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}{code}
 

 


> Session window with dynamic gap doesn't work
> --------------------------------------------
>
>                 Key: FLINK-25007
>                 URL: https://issues.apache.org/jira/browse/FLINK-25007
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.0
>         Environment: Local environment
>            Reporter: Ori Popowski
>            Priority: Major
>
> I am creating a simple application with events firing every 15 seconds. I 
> created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but 
> after the 4th event, it should return 1 millisecond. I expected that after 
> the 4th event, a session window will trigger, but it's not what happens. In 
> reality the session window never triggers, even though after the 4th event, 
> the session gap is effectively 1 millisecond and the interval between events 
> is 15 seconds.
>  
> {code:java}
> object Main {
>   def main(args: Array[String]): Unit = {
>     val senv = StreamExecutionEnvironment.getExecutionEnvironment
>     senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val now = Instant.now()
>     senv
>       .addSource(new Source(now))
>       .assignAscendingTimestamps(_.time.toEpochMilli)
>       .keyBy(_ => 1)
>       .window(DynamicEventTimeSessionWindows.withDynamicGap(new 
> SessionWindowTimeGapExtractor[Element] {
>         override def extract(element: Element): Long = {
>           if (element.sessionEnd) 1
>           else 90.minutes.toMillis
>         }
>       }))
>       .process(new ProcessWindowFunction[Element, Vector[Element], Int, 
> TimeWindow] {
>         override def process(k: Int, context: Context, elements: 
> Iterable[Element], out: Collector[Vector[Element]]): Unit = {
>           out.collect(elements.toVector)
>         }
>       })
>       .print()
>     senv.execute()
>   }
> }
> case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)
> class Source(now: Instant) extends RichSourceFunction[Element] {
>   @volatile private var isRunning = true
>   private var totalInterval = 0L
>   private var i = 0
>   override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
>     while (isRunning) {
>       val element = Element(i, now.plusMillis(totalInterval))
>       if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
>       else ctx.collect(element)
>       i += 1
>       totalInterval += 15.seconds.toMillis
>       Thread.sleep(15.seconds.toMillis)
>     }
>   }
>   override def cancel(): Unit = {
>     isRunning = false
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to