+1. Yes your use case would probably fit best in the OVER aggregate use case.
I actually created for myself a complimentary note <https://docs.google.com/document/d/1cUd47_m5DOiti8KpWHfkd3PBNLe-ESgEgTaNhj0Tr6c/edit?usp=sharing> for some of the complex aggregate components on top of Flink SQL/Table API official doc[1]. If this could help you better understanding how the OVER aggregate method could fit into your use case. Let me know if it is helpful :-) @Fabian, if possible, please share some comments on the note when you have time. :-) Thanks, Rong On Fri, Jul 6, 2018 at 2:30 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Yennie, > > You might want to have a look at the OVER windows of Flink's Table API or > SQL [1]. > > An OVER window computes an aggregate (such as a count) for each incoming > record over a range of previous events. > For example the query: > > SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY > loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) > FROM logins; > > computes for each login attempt the number of login attempts of the > previous hour. > > There is no corresponding built-in operator in the DataStream API but SQL > and Table API queries can be very easily integrated with DataStream > programs [2]. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api > > 2018-07-06 8:01 GMT+02:00 YennieChen88 <chenyanyi...@jd.com>: > >> Hi Kostas and Rong, >> Thank you for your reply. >> As both of you ask for more info about my use case, I now reply in >> unison. >> My case is used for counting the number of successful login and >> failures >> within one hour, keyBy other login related attributes (e.g. ip, device, >> login type ...). According to the count result of the previous hour, to >> judge whether the next login is compliant. >> We have a high requirement for the flink compute time, to reduce the >> impact on user login. From receiving source to sinking results into >> database, only about 10ms time is acceptable. Base on this, we expect the >> compute result as accurate as possible. The best case without error is the >> latest sink time after 1-hour compute exactly the same as the user login >> time which need judge compliance. Is that means the smaller the step size >> of >> slide window, the more accurate the results? But Now it seems that the >> smaller step size of slide window,the longer time need to compute. Because >> once a element arrives, it will be processed in every window (number of >> windows = window size/step size)serially through one thread. >> >> Rong Rong wrote >> > Hi Yennie, >> > >> > AFAIK, the sliding window will in fact duplicate elements into multiple >> > different streams. There's a discussion thread regarding this [1]. >> > We are looking into some performance improvement, can you provide some >> > more >> > info regarding your use case? >> > >> > -- >> > Rong >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-7001 >> > >> > On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas < >> >> > k.kloudas@ >> >> > > >> > wrote: >> > >> >> Hi, >> >> >> >> You are correct that with sliding windows you will have 3600 “open >> >> windows” at any point. >> >> Could you describe a bit more what you want to do? >> >> >> >> If you simply want to have an update of something like a counter every >> >> second, then you can >> >> implement your own logic with a ProcessFunction that allows to handle >> >> state and timers in a >> >> custom way (see [1]). >> >> >> >> Hope this helps, >> >> Kostas >> >> >> >> [1] >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html >> >> >> >> >> >> On Jul 5, 2018, at 12:12 PM, YennieChen88 < >> >> > chenyanying3@ >> >> > > wrote: >> >> >> >> Hi, >> >> I want to use slide windows of 1 hour window size and 1 second step >> >> size. I found that once a element arrives, it will be processed in 3600 >> >> windows serially through one thread. It takes serveral seconds to >> finish >> >> one >> >> element processing,much more than my expection. Do I have any way to >> >> optimizate it? >> >> Thank you very much for your reply. >> >> >> >> >> >> >> >> -- >> >> Sent from: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >> >> >> >> >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > >