I haven't dug too deep into the content. But seems like this line was the reason:
.keyBy(s => s.endsWith("FRI")) essentially you are creating two key partitions (True, False) where each one of them has its own sliding window I believe. Can you printout the key space for each of the window? -- Rong On Sun, Sep 16, 2018 at 1:23 AM 远远 <zhao137578...@gmail.com> wrote: > > > ---------- Forwarded message --------- > From: 远远 <zhao137578...@gmail.com> > Date: 2018年9月16日周日 下午4:08 > Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice > To: <xingc...@gmail.com> > > > hi, the flink version that i test is 1.4.2, and i just run test code with > local env in IDEA, and all the setting in the test code. > my os is deepin(linux based debian) 15.7... > > and i try again, the print as flow: > now ===> 2018-09-16 16:06:09 > start ===> 2018-09-16 16:05:10 > end ===> 2018-09-16 16:06:10 > max ===> 2018-09-16 16:06:09 > TimeWindow{start=1537085110000, end=1537085170000} > aggreation > now ===> 2018-09-16 16:06:09 > start ===> 2018-09-16 16:05:10 > end ===> 2018-09-16 16:06:10 > max ===> 2018-09-16 16:06:09 > TimeWindow{start=1537085110000, end=1537085170000} > aggreation > now ===> 2018-09-16 16:06:16 > start ===> 2018-09-16 16:05:15 > end ===> 2018-09-16 16:06:15 > max ===> 2018-09-16 16:06:14 > TimeWindow{start=1537085115000, end=1537085175000} > aggreation > now ===> 2018-09-16 16:06:19 > start ===> 2018-09-16 16:05:20 > end ===> 2018-09-16 16:06:20 > max ===> 2018-09-16 16:06:19 > TimeWindow{start=1537085120000, end=1537085180000} > aggreation > now ===> 2018-09-16 16:06:20 > start ===> 2018-09-16 16:05:20 > end ===> 2018-09-16 16:06:20 > max ===> 2018-09-16 16:06:19 > TimeWindow{start=1537085120000, end=1537085180000} > aggreation > now ===> 2018-09-16 16:06:24 > start ===> 2018-09-16 16:05:25 > end ===> 2018-09-16 16:06:25 > max ===> 2018-09-16 16:06:24 > TimeWindow{start=1537085125000, end=1537085185000} > aggreation > now ===> 2018-09-16 16:06:24 > start ===> 2018-09-16 16:05:25 > end ===> 2018-09-16 16:06:25 > max ===> 2018-09-16 16:06:24 > TimeWindow{start=1537085125000, end=1537085185000} > aggreation > now ===> 2018-09-16 16:06:25 > start ===> 2018-09-16 16:05:25 > end ===> 2018-09-16 16:06:25 > max ===> 2018-09-16 16:06:24 > TimeWindow{start=1537085125000, end=1537085185000} > aggreation > now ===> 2018-09-16 16:06:29 > start ===> 2018-09-16 16:05:30 > end ===> 2018-09-16 16:06:30 > max ===> 2018-09-16 16:06:29 > TimeWindow{start=1537085130000, end=1537085190000} > aggreation > now ===> 2018-09-16 16:06:29 > start ===> 2018-09-16 16:05:30 > end ===> 2018-09-16 16:06:30 > max ===> 2018-09-16 16:06:29 > TimeWindow{start=1537085130000, end=1537085190000} > aggreation > now ===> 2018-09-16 16:06:30 > start ===> 2018-09-16 16:05:30 > end ===> 2018-09-16 16:06:30 > max ===> 2018-09-16 16:06:29 > TimeWindow{start=1537085130000, end=1537085190000} > aggreation > now ===> 2018-09-16 16:06:36 > start ===> 2018-09-16 16:05:35 > end ===> 2018-09-16 16:06:35 > max ===> 2018-09-16 16:06:34 > TimeWindow{start=1537085135000, end=1537085195000} > > > Xingcan Cui <xingc...@gmail.com> 于2018年9月16日周日 下午3:55写道: > >> Hi, >> >> I’ve tested your code in my local environment and everything worked fine. >> It’s a little weird to see your output like that. I wonder if you could >> give more information about your environment, e.g., your flink version and >> execution settings. >> >> Thanks, >> Xingcan >> >> On Sep 16, 2018, at 3:19 PM, 远远 <zhao137578...@gmail.com> wrote: >> >> hi,everyone: >> today, i test Sliding ProcessTime TimeWindow with print some merties. i >> find a same sliding window be printed twice, as fllower: >> >> now ===> 2018-09-16 15:11:44 >> start ===> 2018-09-16 15:10:45 >> end ===> 2018-09-16 15:11:45 >> max ===> 2018-09-16 15:11:44 >> TimeWindow{start=1537081845000, end=1537081905000} >> aggreation >> now ===> 2018-09-16 15:11:45 >> start ===> 2018-09-16 15:10:45 >> end ===> 2018-09-16 15:11:45 >> max ===> 2018-09-16 15:11:44 >> TimeWindow{start=1537081845000, end=1537081905000} >> aggreation >> >> but when i do some sum operator,it will not, i want to know why? >> thanks. >> >> my test code is: >> >> object SlidingProcessTimeWindowTest { >> >> def main(args: Array[String]): Unit = { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.addSource((context: SourceContext[String]) => {while(true) >> context.collect(new Random().nextInt(100) + ":FRI")}) >> .keyBy(s => s.endsWith("FRI")) >> .timeWindow(Time.minutes(1), Time.seconds(5)) >> .apply((e, w, iter, coll: Collector[String]) => { >> println("now ===> " + >> convert(DateTime.now().getMillis)) >> println("start ===> " + convert(w.getStart)) >> println("end ===> " + convert(w.getEnd)) >> println("max ===> " + convert(w.maxTimestamp())) >> println(w) >> // var reduce: Long = 0 >> // for(e <- iter){ >> // reduce += e.substring(0, e.length - 4).toInt >> // } >> // println("reduce ==> " + reduce) >> coll.collect("aggreation") >> }).setParallelism(1).print().setParallelism(1) >> >> env.execute() >> } >> >> def convert(time: Long): String = { >> new DateTime(time).toString("yyyy-MM-dd HH:mm:ss") >> } >> } >> >> >>