On a side note: even if we change this off-by-one bug, I think there can still 
be races because current processing time is queried using 
System.currentTimeMillis() and we set timers using a 
ScheduledThreadPoolExecutor (currently). If there's any race between those two 
you can also get weird results.

For these reasons, I would always suggest to go with event time or ingestion 
time, but I think the latter is currently not possible with the Table API/SQL.


> On 16. Jul 2018, at 11:39, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> I think there is a bug in how processing-time timers work. For event-time, we 
> fire timers when the watermark is >= the timestamp, this is correct because a 
> watermark T says that we will not see elements with a timestamp smaller or 
> equal to T. For processing time, a time of T does not say that we won't see 
> an element with timestamp T. Therefore the triggering behaviour is wrong for 
> processing time. I created a Jira issue for this: 
> https://issues.apache.org/jira/browse/FLINK-9857 
> <https://issues.apache.org/jira/browse/FLINK-9857>
> 
> Best,
> Aljoscha
> 
>> On 16. Jul 2018, at 07:36, Yuan,Youjun <yuanyou...@baidu.com 
>> <mailto:yuanyou...@baidu.com>> wrote:
>> 
>> Hi Hequn,
>>  
>> To my understand, a processing time window is fired at the last millisecond 
>> of the window(maxTimestamp). Then what will happen if more elements arrive 
>> at the last millisecond, but AFTER the window is fired?
>>  
>> Thanks,
>> Youjun
>> 发件人: Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> 
>> 发送时间: Friday, July 13, 2018 9:44 PM
>> 收件人: Yuan,Youjun <yuanyou...@baidu.com <mailto:yuanyou...@baidu.com>>
>> 抄送: Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>; 
>> user@flink.apache.org <mailto:user@flink.apache.org>
>> 主题: Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same 
>> window
>>  
>> Hi Youjun,
>>  
>> The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the 
>> rowtime value of window. Sql will be parsed and translated into some nodes, 
>> Source -> Calc -> Window -> Sink. The Calc is the input node of Window and 
>> the udf is part of Calc instead of Window. So the max_ts and min_ts is 
>> actually the time before entering the window, i.e, not the time in window.
>>  
>> However, I still can't find anything valuable to solve the problem. It seems 
>> the window has been triggered many times for the same key. I will think more 
>> about it.
>>  
>> Best, Hequn.
>>  
>> On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yuanyou...@baidu.com 
>> <mailto:yuanyou...@baidu.com>> wrote:
>> Hi Hequn,
>>  
>> I am using Flink 1.4. The job was running with  1 parallelism.
>>  
>> I don’t think the extra records are caused by different keys, because:
>> I ran 2 jobs consuming the same source, jobA with 2-minute window, and job 
>> with 4-minute window. If there are wired keys, then jobA will get no more 
>> records than jobB, for the same period. But that not true, jobA got 17 
>> records while jobB got 11. Relevant results could be found below.
>> For each window, I output the min and max timestamp, and found that those 
>> extra records always start at the last few milliseconds of the 2 or 4-minte 
>> windows, just before window got closed. I also noticed the windows did not 
>> have a clear cut between minutes, as we can see in jobA’s output, ts 
>> 1531448399978 appears in 18 result records, either as start, or end, or both.
>>  
>> jobA(2-minute window) output
>> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
>> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
>> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}
>>  
>> jobB(4-minute window) output
>> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
>> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01","min_ts":1531448399978,"max_ts":1531448639916}
>>  
>> Thanks
>> Youjun
>>  
>> 发件人: Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> 
>> 发送时间: Thursday, July 12, 2018 11:31 PM
>> 收件人: Yuan,Youjun <yuanyou...@baidu.com <mailto:yuanyou...@baidu.com>>
>> 抄送: Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>; 
>> user@flink.apache.org <mailto:user@flink.apache.org>
>> 主题: Re: 答复: TumblingProcessingTimeWindow emits extra results for a same 
>> window
>>  
>> Hi Yuan,
>>  
>> Haven't heard about this before. Which flink version do you use? The cause 
>> may be:
>> 1. userId not 100% identical, for example contains invisible characters.
>> 2. The machine clock vibrated.
>>  
>> Otherwise,  there are some bugs we don't know.
>>  
>> Best, Hequn
>>  
>> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yuanyou...@baidu.com 
>> <mailto:yuanyou...@baidu.com>> wrote:
>> Hi Timo,
>>  
>> This problem happens 4-5 times a day on our online server, with ~15k events 
>> per second load, and it is using PROCESSING time. So I don’t think I can 
>> stably reproduce the issue on my local machine.
>> The user ids are actually the same, I have doubled checked that.
>>  
>> Now, I am wondering could it possible that, after a window fires, some last 
>> events came but that still fall to the time range of the just fired window, 
>> hence new windows are assigned, and fired later. This can explain why the 
>> extra records always contain only a few events (cnt is small).
>>  
>> To verify that, I just modified the SQL to also output the MIN timestamp of 
>> each windows, and I found the MIN timestamp of theextra records are always 
>> point to the LAST second of the window.
>> Here is the output I just got, note 1531395119 is the last second of a 
>> 2-minute window start from 1531395000.
>> --------------------------------------------------------------------------------------------------------------------------------
>> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
>> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
>> {"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
>> {"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>>  
>> The modified SQL:
>> INSERT INTO sink
>> SELECT
>>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`, 
>>                 count(vehicleId) AS cnt, userId,
>>                 MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
>> FROM source
>> GROUP BY
>>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>>                 userId
>>  
>> thanks
>> Youjun
>>  
>> 发件人: Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> 
>> 发送时间: Thursday, July 12, 2018 5:02 PM
>> 收件人: user@flink.apache.org <mailto:user@flink.apache.org>
>> 主题: Re: TumblingProcessingTimeWindow emits extra results for a same window
>>  
>> Hi Yuan,
>> 
>> this sounds indeed weird. The SQL API uses regular DataStream API windows 
>> underneath so this problem should have come up earlier if this is problem in 
>> the implementation. Does this behavior reproducible on your local machine?
>> 
>> One thing that comes to my mind is that the "userId"s might not be 100% 
>> identical (same hashCode/equals method) because otherwise they would be 
>> properly grouped.
>> 
>> Regards,
>> Timo
>> 
>> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>> Hi community,
>>  
>> I have a job which counts event number every 2 minutes, with TumblingWindow 
>> in ProcessingTime. However, it occasionally produces extra DUPLICATED 
>> records. For instance, for timestamp 1531368480000 below, it emits a normal 
>> result (cnt=1641161), and then followed by a few more records with very 
>> small result (2, 3, etc).
>>  
>> Can anyone shed some light on the possible reason, or how to fix it?
>>  
>> Below are the sample output.
>> -----------------------------------------------------------
>> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
>> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":2,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
>>  
>> And here is the job SQL:
>> -----------------------------------------------------------
>> INSERT INTO sink
>> SELECT
>>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>>                 count(vehicleId) AS cnt,
>>                 userId
>> FROM source
>>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>>                 userId
>>  
>> Thanks,
>> Youjun Yuan
> 

Reply via email to