The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 
3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the 
data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ches...@apache.org> wrote:
> 
> could you state a specific problem?
> 
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>> I've a simple program which takes some inputs from a command line (Socket 
>> stream) and then aggregates based on the key. 
>> 
>> When running this program on my local machine I see some output that is 
>> counter intuitive to my understanding of windows in Flink. 
>> 
>> The start time of the Window is around the time the Functions are being 
>> evaluated. However, the window end time is around 60 s (window size) after 
>> the current time (please see below).  
>> 
>> Can someone explain this behaviour please? 
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
>> import org.apache.flink.util.Collector
>> 
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> 
>> object Processor {
>> 
>>   val window_length = 60000 // milliseconds
>> 
>>   def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], 
>> out: Collector[EventAgg]): Unit = {
>>     var sum = 0
>>     for (e <- in) {
>>       sum = sum + e.value
>>     }
>>     val start = window.getStart
>>     val end = window.getEnd
>>     val diff = (end - start)
>>     println(s" windowId: ${window.hashCode()} currenttime: 
>> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: 
>> $diff")
>> 
>> 
>>     out.collect(
>>       new EventAgg(
>>         start = window.getStart,
>>         end = window.getEnd,
>>         key = key,
>>         value = sum
>>       )
>>     )
>>   }
>> 
>>   def main(Args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> 
>>     val sevents = env.socketTextStream("localhost", 9000)
>>     sevents
>>       .map(x => parseEvent(x))
>>       .keyBy(_.key)
>>       
>> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - 
>> " + _.toString)
>>       .print()
>> 
>>     env.execute("Event time windows")
>>   }
>> 
>>   def parseEvent(s: String): Event = {
>>     if (s == null || s.trim().length == 0)
>>       Event("default", 0, 0L)
>>     else {
>>       val parts = s.split(",")
>>       Event(parts(0), parts(1).toInt, 1L)
>>     }
>>   }
>> }
>> 
>> Output
>> 
>>  windowId: -663519360 currenttime: 1465234200007 key:[a] start: 
>> 1465234200000 end: 1465234260000 diff: 60000
>>  windowId: -663519360 currenttime: 1465234200006 key:[b] start: 
>> 1465234200000 end: 1465234260000 diff: 60000
>> 3> Default Assigner: 1465234200010 - 
>> EventAgg(1465234200000,1465234260000,a,3)
>> 7> Default Assigner: 1465234200010 - 
>> EventAgg(1465234200000,1465234260000,b,4)
>> 
>> 
> 

Reply via email to