I've a simple program which takes some inputs from a command line (Socket
stream) and then aggregates based on the key.

When running this program on my local machine I see some output that is
counter intuitive to my understanding of windows in Flink.

The start time of the Window is around the time the Functions are being
evaluated. However, *the window end time is around 60 s (window size) after
the current time (please see below). *

Can someone explain this behaviour please?

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in:
Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime:
${System.currentTimeMillis()} key:[$key] start: $start end: $end diff:
$diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString
+ " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}


*Output*

 windowId: -663519360 currenttime: 1465234200007 key:[a] start:
1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start:
1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 -
EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 -
EventAgg(1465234200000,1465234260000,b,4)

Reply via email to