I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key.
When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. The start time of the Window is around the time the Functions are being evaluated. However, *the window end time is around 60 s (window size) after the current time (please see below). * Can someone explain this behaviour please? import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector case class EventAgg(start: Long, end: Long, key: String, value: Int) object Processor { val window_length = 60000 // milliseconds def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = { var sum = 0 for (e <- in) { sum = sum + e.value } val start = window.getStart val end = window.getEnd val diff = (end - start) println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff") out.collect( new EventAgg( start = window.getStart, end = window.getEnd, key = key, value = sum ) ) } def main(Args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val sevents = env.socketTextStream("localhost", 9000) sevents .map(x => parseEvent(x)) .keyBy(_.key) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length))) .apply(aggregateEvents(_, _, _, _: Collector[EventAgg])) .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString) .print() env.execute("Event time windows") } def parseEvent(s: String): Event = { if (s == null || s.trim().length == 0) Event("default", 0, 0L) else { val parts = s.split(",") Event(parts(0), parts(1).toInt, 1L) } } } *Output* windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000 3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3) 7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)