I haven't dug too deep into the content. But seems like this line was the
reason:

                .keyBy(s => s.endsWith("FRI"))

essentially you are creating two key partitions (True, False) where each
one of them has its own sliding window I believe. Can you printout the key
space for each of the window?

--
Rong

On Sun, Sep 16, 2018 at 1:23 AM 远远 <zhao137578...@gmail.com> wrote:

>
>
> ---------- Forwarded message ---------
> From: 远远 <zhao137578...@gmail.com>
> Date: 2018年9月16日周日 下午4:08
> Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
> To: <xingc...@gmail.com>
>
>
> hi, the flink version that i test  is 1.4.2, and i just run test code with
> local env in IDEA, and all the setting in the test code.
> my os is deepin(linux based debian) 15.7...
>
> and i try again, the print as flow:
> now   ===> 2018-09-16 16:06:09
> start ===> 2018-09-16 16:05:10
> end   ===> 2018-09-16 16:06:10
> max   ===> 2018-09-16 16:06:09
> TimeWindow{start=1537085110000, end=1537085170000}
> aggreation
> now   ===> 2018-09-16 16:06:09
> start ===> 2018-09-16 16:05:10
> end   ===> 2018-09-16 16:06:10
> max   ===> 2018-09-16 16:06:09
> TimeWindow{start=1537085110000, end=1537085170000}
> aggreation
> now   ===> 2018-09-16 16:06:16
> start ===> 2018-09-16 16:05:15
> end   ===> 2018-09-16 16:06:15
> max   ===> 2018-09-16 16:06:14
> TimeWindow{start=1537085115000, end=1537085175000}
> aggreation
> now   ===> 2018-09-16 16:06:19
> start ===> 2018-09-16 16:05:20
> end   ===> 2018-09-16 16:06:20
> max   ===> 2018-09-16 16:06:19
> TimeWindow{start=1537085120000, end=1537085180000}
> aggreation
> now   ===> 2018-09-16 16:06:20
> start ===> 2018-09-16 16:05:20
> end   ===> 2018-09-16 16:06:20
> max   ===> 2018-09-16 16:06:19
> TimeWindow{start=1537085120000, end=1537085180000}
> aggreation
> now   ===> 2018-09-16 16:06:24
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:24
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:25
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:29
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=1537085130000, end=1537085190000}
> aggreation
> now   ===> 2018-09-16 16:06:29
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=1537085130000, end=1537085190000}
> aggreation
> now   ===> 2018-09-16 16:06:30
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=1537085130000, end=1537085190000}
> aggreation
> now   ===> 2018-09-16 16:06:36
> start ===> 2018-09-16 16:05:35
> end   ===> 2018-09-16 16:06:35
> max   ===> 2018-09-16 16:06:34
> TimeWindow{start=1537085135000, end=1537085195000}
>
>
> Xingcan Cui <xingc...@gmail.com> 于2018年9月16日周日 下午3:55写道:
>
>> Hi,
>>
>> I’ve tested your code in my local environment and everything worked fine.
>> It’s a little weird to see your output like that. I wonder if you could
>> give more information about your environment, e.g., your flink version and
>> execution settings.
>>
>> Thanks,
>> Xingcan
>>
>> On Sep 16, 2018, at 3:19 PM, 远远 <zhao137578...@gmail.com> wrote:
>>
>> hi,everyone:
>> today, i test Sliding ProcessTime TimeWindow with print some merties. i
>> find a same sliding window be printed twice, as fllower:
>>
>> now   ===> 2018-09-16 15:11:44
>> start ===> 2018-09-16 15:10:45
>> end   ===> 2018-09-16 15:11:45
>> max   ===> 2018-09-16 15:11:44
>> TimeWindow{start=1537081845000, end=1537081905000}
>> aggreation
>> now   ===> 2018-09-16 15:11:45
>> start ===> 2018-09-16 15:10:45
>> end   ===> 2018-09-16 15:11:45
>> max   ===> 2018-09-16 15:11:44
>> TimeWindow{start=1537081845000, end=1537081905000}
>> aggreation
>>
>> but when i do some sum operator,it will not, i want to know why?
>> thanks.
>>
>> my test code is:
>>
>> object SlidingProcessTimeWindowTest {
>>
>>     def main(args: Array[String]): Unit = {
>>         val env = StreamExecutionEnvironment.getExecutionEnvironment
>>         env.addSource((context: SourceContext[String]) => {while(true) 
>> context.collect(new Random().nextInt(100) + ":FRI")})
>>                 .keyBy(s => s.endsWith("FRI"))
>>                 .timeWindow(Time.minutes(1), Time.seconds(5))
>>                 .apply((e, w, iter, coll: Collector[String]) => {
>>                     println("now   ===> " + 
>> convert(DateTime.now().getMillis))
>>                     println("start ===> " + convert(w.getStart))
>>                     println("end   ===> " + convert(w.getEnd))
>>                     println("max   ===> " + convert(w.maxTimestamp()))
>>                     println(w)
>> //                    var reduce: Long = 0
>> //                    for(e <- iter){
>> //                        reduce += e.substring(0, e.length - 4).toInt
>> //                    }
>> //                    println("reduce ==> " + reduce)
>>                     coll.collect("aggreation")
>>                 }).setParallelism(1).print().setParallelism(1)
>>
>>         env.execute()
>>     }
>>
>>     def convert(time: Long): String = {
>>         new DateTime(time).toString("yyyy-MM-dd HH:mm:ss")
>>     }
>> }
>>
>>
>>

Reply via email to