Hi Yennie, You might want to have a look at the OVER windows of Flink's Table API or SQL [1].
An OVER window computes an aggregate (such as a count) for each incoming record over a range of previous events. For example the query: SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM logins; computes for each login attempt the number of login attempts of the previous hour. There is no corresponding built-in operator in the DataStream API but SQL and Table API queries can be very easily integrated with DataStream programs [2]. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations [2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api 2018-07-06 8:01 GMT+02:00 YennieChen88 <chenyanyi...@jd.com>: > Hi Kostas and Rong, > Thank you for your reply. > As both of you ask for more info about my use case, I now reply in > unison. > My case is used for counting the number of successful login and > failures > within one hour, keyBy other login related attributes (e.g. ip, device, > login type ...). According to the count result of the previous hour, to > judge whether the next login is compliant. > We have a high requirement for the flink compute time, to reduce the > impact on user login. From receiving source to sinking results into > database, only about 10ms time is acceptable. Base on this, we expect the > compute result as accurate as possible. The best case without error is the > latest sink time after 1-hour compute exactly the same as the user login > time which need judge compliance. Is that means the smaller the step size > of > slide window, the more accurate the results? But Now it seems that the > smaller step size of slide window,the longer time need to compute. Because > once a element arrives, it will be processed in every window (number of > windows = window size/step size)serially through one thread. > > Rong Rong wrote > > Hi Yennie, > > > > AFAIK, the sliding window will in fact duplicate elements into multiple > > different streams. There's a discussion thread regarding this [1]. > > We are looking into some performance improvement, can you provide some > > more > > info regarding your use case? > > > > -- > > Rong > > > > [1] https://issues.apache.org/jira/browse/FLINK-7001 > > > > On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas < > > > k.kloudas@ > > > > > > wrote: > > > >> Hi, > >> > >> You are correct that with sliding windows you will have 3600 “open > >> windows” at any point. > >> Could you describe a bit more what you want to do? > >> > >> If you simply want to have an update of something like a counter every > >> second, then you can > >> implement your own logic with a ProcessFunction that allows to handle > >> state and timers in a > >> custom way (see [1]). > >> > >> Hope this helps, > >> Kostas > >> > >> [1] > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/ > operators/process_function.html > >> > >> > >> On Jul 5, 2018, at 12:12 PM, YennieChen88 < > > > chenyanying3@ > > > > wrote: > >> > >> Hi, > >> I want to use slide windows of 1 hour window size and 1 second step > >> size. I found that once a element arrives, it will be processed in 3600 > >> windows serially through one thread. It takes serveral seconds to finish > >> one > >> element processing,much more than my expection. Do I have any way to > >> optimizate it? > >> Thank you very much for your reply. > >> > >> > >> > >> -- > >> Sent from: > >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > >> > >> > >> > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >