Ah, cool. I was thinking register a timer at T and will be triggered at
T+1ms.

On Mon, Jul 16, 2018 at 7:26 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> On a side note: even if we change this off-by-one bug, I think there can
> still be races because current processing time is queried using
> System.currentTimeMillis() and we set timers using a 
> ScheduledThreadPoolExecutor
> (currently). If there's any race between those two you can also get weird
> results.
>
> For these reasons, I would always suggest to go with event time or
> ingestion time, but I think the latter is currently not possible with the
> Table API/SQL.
>
>
> On 16. Jul 2018, at 11:39, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> I think there is a bug in how processing-time timers work. For event-time,
> we fire timers when the watermark is >= the timestamp, this is correct
> because a watermark T says that we will not see elements with a timestamp 
> *smaller
> or equal* to T. For processing time, a time of T does not say that we
> won't see an element with timestamp T. Therefore the triggering behaviour
> is wrong for processing time. I created a Jira issue for this:
> https://issues.apache.org/jira/browse/FLINK-9857
>
> Best,
> Aljoscha
>
> On 16. Jul 2018, at 07:36, Yuan,Youjun <yuanyou...@baidu.com> wrote:
>
> Hi Hequn,
>
> To my understand, a processing time window is fired at the last
> millisecond of the window(maxTimestamp). Then what will happen if more
> elements arrive *at the last millisecond, but AFTER the window is fired*?
>
> Thanks,
> Youjun
> *发件人**:* Hequn Cheng <chenghe...@gmail.com>
> *发送时间:* Friday, July 13, 2018 9:44 PM
> *收件人:* Yuan,Youjun <yuanyou...@baidu.com>
> *抄送:* Timo Walther <twal...@apache.org>; user@flink.apache.org
> *主题:* Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a
> same window
>
> Hi Youjun,
>
> The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the
> rowtime value of window. Sql will be parsed and translated into some nodes,
> Source -> Calc -> Window -> Sink. The Calc is the input node of Window and
> the udf is part of Calc instead of Window. So the max_ts and min_ts is
> actually the time before entering the window, i.e, not the time in window.
>
> However, I still can't find anything valuable to solve the problem. It
> seems the window has been triggered many times for the same key. I will
> think more about it.
>
> Best, Hequn.
>
> On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yuanyou...@baidu.com>
> wrote:
>
> Hi Hequn,
>
> I am using Flink 1.4. The job was running with  1 parallelism.
>
> I don’t think the extra records are caused by different keys, because:
>
>    1. I ran 2 jobs consuming the same source, jobA with 2-minute window,
>    and job with 4-minute window. If there are wired keys, then jobA will get
>    no more records than jobB, for the same period. But that not true, *jobA
>    got 17* records while *jobB got 11*. Relevant results could be found
>    below.
>    2. For each window, I output the *min and max timestamp*, and found
>    that those extra records always start at the last few milliseconds of the 2
>    or 4-minte windows, just before window got closed. I also noticed the
>    windows did not have a clear cut between minutes, as we can see in jobA’s
>    output, ts *1531448399978* appears in 18 result records, either as
>    start, or end, or both.
>
>
> jobA(2-minute window) output
> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01"
> ,"min_ts":1531448040003,"max_ts":1531448159985}
> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01"
> ,"min_ts":1531448159985,"max_ts":1531448279979}
> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01"
> ,"min_ts":1531448280004,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":1531448519978}
>
> jobB(4-minute window) output
> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01"
> ,"min_ts":1531447919981,"max_ts":1531448159975}
> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01"
> ,"min_ts":1531448159098,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01"
> ,"min_ts":1531448399978,"max_ts":1531448639916}
>
> Thanks
> Youjun
>
> *发件人**:* Hequn Cheng <chenghe...@gmail.com>
> *发送时间:* Thursday, July 12, 2018 11:31 PM
> *收件人:* Yuan,Youjun <yuanyou...@baidu.com>
> *抄送:* Timo Walther <twal...@apache.org>; user@flink.apache.org
> *主题:* Re: 答复: TumblingProcessingTimeWindow emits extra results for a same
> window
>
> Hi Yuan,
>
> Haven't heard about this before. Which flink version do you use? The
> cause may be:
> 1. userId not 100% identical, for example contains invisible characters.
> 2. The machine clock vibrated.
>
> Otherwise,  there are some bugs we don't know.
>
> Best, Hequn
>
> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yuanyou...@baidu.com> wrote:
>
> Hi Timo,
>
> This problem happens 4-5 times a day on our online server, with ~15k
> events per second load, and it is using PROCESSING time. So I don’t think I
> can stably reproduce the issue on my local machine.
> The user ids are actually the same, I have doubled checked that.
>
> Now, I am wondering could it possible that, after a window fires, some
> last events came but that still fall to the time range of the just fired
> window, hence new windows are assigned, and fired later. This can explain
> why the extra records always contain only a few events (cnt is small).
>
> To verify that, I just modified the SQL to also output the MIN timestamp
> of each windows, and I found the MIN timestamp of the*extra records are
> always point to the LAST second of the window*.
> Here is the output I just got, note *1531395119 *is the last second of a
> 2-minute window start from* 1531395000.*
> ------------------------------------------------------------
> --------------------------------------------------------------------
> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01"
> ,"min_sec":1531394760}
> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01"
> ,"min_sec":1531394879}
> {"timestamp":*1531395000000*,"cnt":*1446010*,"userId":"user01","min_sec":
> *1531395000*}
> {"timestamp":*1531395000000*,"cnt":*7*,"userId":"user01","min_sec":
> *1531395119*}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> The modified SQL:
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>                 count(vehicleId) AS cnt, userId,
>                 *MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec*
> FROM source
> GROUP BY
>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>
> thanks
> Youjun
>
> *发件人**:* Timo Walther <twal...@apache.org>
> *发送时间:* Thursday, July 12, 2018 5:02 PM
> *收件人:* user@flink.apache.org
> *主题:* Re: TumblingProcessingTimeWindow emits extra results for a same
> window
>
> Hi Yuan,
>
> this sounds indeed weird. The SQL API uses regular DataStream API windows
> underneath so this problem should have come up earlier if this is problem
> in the implementation. Does this behavior reproducible on your local
> machine?
>
> One thing that comes to my mind is that the "userId"s might not be 100%
> identical (same hashCode/equals method) because otherwise they would be
> properly grouped.
>
> Regards,
> Timo
>
> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>
> Hi community,
>
> I have a job which counts event number every 2 minutes, with
> TumblingWindow in ProcessingTime. However, it occasionally produces extra
> DUPLICATED records. For instance, for timestamp 1531368480000 below, it
> emits a normal result (cnt=1641161), and then followed by a few more
> records with very small result (2, 3, etc).
>
> Can anyone shed some light on the possible reason, or how to fix it?
>
> Below are the sample output.
> -----------------------------------------------------------
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":1641161,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":2,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> And here is the job SQL:
> -----------------------------------------------------------
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>                 count(vehicleId) AS cnt,
>                 userId
> FROM source
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>
> Thanks,
> Youjun Yuan
>
>
>
>

Reply via email to