Hi,

I’ve tested your code in my local environment and everything worked fine. It’s 
a little weird to see your output like that. I wonder if you could give more 
information about your environment, e.g., your flink version and execution 
settings.

Thanks,
Xingcan

> On Sep 16, 2018, at 3:19 PM, 远远 <zhao137578...@gmail.com> wrote:
> 
> hi,everyone:
> today, i test Sliding ProcessTime TimeWindow with print some merties. i find 
> a same sliding window be printed twice, as fllower:
> 
> now   ===> 2018-09-16 15:11:44
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
> now   ===> 2018-09-16 15:11:45
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
>  
> but when i do some sum operator,it will not, i want to know why? 
> thanks. 
> 
> my test code is:
> object SlidingProcessTimeWindowTest {
> 
>     def main(args: Array[String]): Unit = {
>         val env = StreamExecutionEnvironment.getExecutionEnvironment
>         env.addSource((context: SourceContext[String]) => {while(true) 
> context.collect(new Random().nextInt(100) + ":FRI")})
>                 .keyBy(s => s.endsWith("FRI"))
>                 .timeWindow(Time.minutes(1), Time.seconds(5))
>                 .apply((e, w, iter, coll: Collector[String]) => {
>                     println("now   ===> " + convert(DateTime.now().getMillis))
>                     println("start ===> " + convert(w.getStart))
>                     println("end   ===> " + convert(w.getEnd))
>                     println("max   ===> " + convert(w.maxTimestamp()))
>                     println(w)
> //                    var reduce: Long = 0
> //                    for(e <- iter){
> //                        reduce += e.substring(0, e.length - 4).toInt
> //                    }
> //                    println("reduce ==> " + reduce)
>                     coll.collect("aggreation")
>                 }).setParallelism(1).print().setParallelism(1)
> 
>         env.execute()
>     }
> 
>     def convert(time: Long): String = {
>         new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
>     }
> }

Reply via email to