Hi, I'm afraid you're running into a bug into the special processing-time window operator. A suggested workaround would be to switch to characteristic IngestionTime and use TumblingEventTimeWindows.
I also open a Jira issue for the bug so that we can keep track of it: https://issues.apache.org/jira/browse/FLINK-4028 Cheers, Aljoscha On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <soumya.sima...@gmail.com> wrote: > The problem is why is the window end time in the future ? > > For example if my window size is 60 seconds and my window is being > evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 > pm even when the data that is being evaluated falls in the window 2.59 - > 3.00. > > Sent from my iPhone > > On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ches...@apache.org> wrote: > > could you state a specific problem? > > On 07.06.2016 06:40, Soumya Simanta wrote: > > I've a simple program which takes some inputs from a command line (Socket > stream) and then aggregates based on the key. > > When running this program on my local machine I see some output that is > counter intuitive to my understanding of windows in Flink. > > The start time of the Window is around the time the Functions are being > evaluated. However, *the window end time is around 60 s (window size) > after the current time (please see below). * > > Can someone explain this behaviour please? > > import org.apache.flink.api.scala._import > org.apache.flink.streaming.api.TimeCharacteristicimport > org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport > org.apache.flink.streaming.api.windowing.time.Timeimport > org.apache.flink.streaming.api.windowing.windows.TimeWindowimport > org.apache.flink.util.Collector > case class EventAgg(start: Long, end: Long, key: String, value: Int) > object Processor { > > val window_length = 60000 // milliseconds def aggregateEvents(key: String, > window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = { > var sum = 0 for (e <- in) { > sum = sum + e.value > } > val start = window.getStart > val end = window.getEnd > val diff = (end - start) > println(s" windowId: ${window.hashCode()} currenttime: > ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff") > > > out.collect( > new EventAgg( > start = window.getStart, > end = window.getEnd, > key = key, > value = sum > ) > ) > } > > def main(Args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) > val sevents = env.socketTextStream("localhost", 9000) > sevents > .map(x => parseEvent(x)) > .keyBy(_.key) > > .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length))) > .apply(aggregateEvents(_, _, _, _: Collector[EventAgg])) > .map("Default Assigner: " + System.currentTimeMillis().toString + " - " > + _.toString) > .print() > > env.execute("Event time windows") > } > > def parseEvent(s: String): Event = { > if (s == null || s.trim().length == 0) > Event("default", 0, 0L) > else { > val parts = s.split(",") > Event(parts(0), parts(1).toInt, 1L) > } > } > } > > > *Output* > > windowId: -663519360 currenttime: 1465234200007 key:[a] start: > 1465234200000 end: 1465234260000 diff: 60000 > windowId: -663519360 currenttime: 1465234200006 key:[b] start: > 1465234200000 end: 1465234260000 diff: 60000 > 3> Default Assigner: 1465234200010 - > EventAgg(1465234200000,1465234260000,a,3) > 7> Default Assigner: 1465234200010 - > EventAgg(1465234200000,1465234260000,b,4) > > > >