hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i
find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation

but when i do some sum operator,it will not, i want to know why?
thanks.

my test code is:

object SlidingProcessTimeWindowTest {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.addSource((context: SourceContext[String]) => {while(true)
context.collect(new Random().nextInt(100) + ":FRI")})
                .keyBy(s => s.endsWith("FRI"))
                .timeWindow(Time.minutes(1), Time.seconds(5))
                .apply((e, w, iter, coll: Collector[String]) => {
                    println("now   ===> " + convert(DateTime.now().getMillis))
                    println("start ===> " + convert(w.getStart))
                    println("end   ===> " + convert(w.getEnd))
                    println("max   ===> " + convert(w.maxTimestamp()))
                    println(w)
//                    var reduce: Long = 0
//                    for(e <- iter){
//                        reduce += e.substring(0, e.length - 4).toInt
//                    }
//                    println("reduce ==> " + reduce)
                    coll.collect("aggreation")
                }).setParallelism(1).print().setParallelism(1)

        env.execute()
    }

    def convert(time: Long): String = {
        new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
    }
}

Reply via email to