Hi, I’ve tested your code in my local environment and everything worked fine. It’s a little weird to see your output like that. I wonder if you could give more information about your environment, e.g., your flink version and execution settings.
Thanks, Xingcan > On Sep 16, 2018, at 3:19 PM, 远远 <zhao137578...@gmail.com> wrote: > > hi,everyone: > today, i test Sliding ProcessTime TimeWindow with print some merties. i find > a same sliding window be printed twice, as fllower: > > now ===> 2018-09-16 15:11:44 > start ===> 2018-09-16 15:10:45 > end ===> 2018-09-16 15:11:45 > max ===> 2018-09-16 15:11:44 > TimeWindow{start=1537081845000, end=1537081905000} > aggreation > now ===> 2018-09-16 15:11:45 > start ===> 2018-09-16 15:10:45 > end ===> 2018-09-16 15:11:45 > max ===> 2018-09-16 15:11:44 > TimeWindow{start=1537081845000, end=1537081905000} > aggreation > > but when i do some sum operator,it will not, i want to know why? > thanks. > > my test code is: > object SlidingProcessTimeWindowTest { > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.addSource((context: SourceContext[String]) => {while(true) > context.collect(new Random().nextInt(100) + ":FRI")}) > .keyBy(s => s.endsWith("FRI")) > .timeWindow(Time.minutes(1), Time.seconds(5)) > .apply((e, w, iter, coll: Collector[String]) => { > println("now ===> " + convert(DateTime.now().getMillis)) > println("start ===> " + convert(w.getStart)) > println("end ===> " + convert(w.getEnd)) > println("max ===> " + convert(w.maxTimestamp())) > println(w) > // var reduce: Long = 0 > // for(e <- iter){ > // reduce += e.substring(0, e.length - 4).toInt > // } > // println("reduce ==> " + reduce) > coll.collect("aggreation") > }).setParallelism(1).print().setParallelism(1) > > env.execute() > } > > def convert(time: Long): String = { > new DateTime(time).toString("yyyy-MM-dd HH:mm:ss") > } > }