hi,everyone: today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower:
now ===> 2018-09-16 15:11:44 start ===> 2018-09-16 15:10:45 end ===> 2018-09-16 15:11:45 max ===> 2018-09-16 15:11:44 TimeWindow{start=1537081845000, end=1537081905000} aggreation now ===> 2018-09-16 15:11:45 start ===> 2018-09-16 15:10:45 end ===> 2018-09-16 15:11:45 max ===> 2018-09-16 15:11:44 TimeWindow{start=1537081845000, end=1537081905000} aggreation but when i do some sum operator,it will not, i want to know why? thanks. my test code is: object SlidingProcessTimeWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")}) .keyBy(s => s.endsWith("FRI")) .timeWindow(Time.minutes(1), Time.seconds(5)) .apply((e, w, iter, coll: Collector[String]) => { println("now ===> " + convert(DateTime.now().getMillis)) println("start ===> " + convert(w.getStart)) println("end ===> " + convert(w.getEnd)) println("max ===> " + convert(w.maxTimestamp())) println(w) // var reduce: Long = 0 // for(e <- iter){ // reduce += e.substring(0, e.length - 4).toInt // } // println("reduce ==> " + reduce) coll.collect("aggreation") }).setParallelism(1).print().setParallelism(1) env.execute() } def convert(time: Long): String = { new DateTime(time).toString("yyyy-MM-dd HH:mm:ss") } }