Hi Sunitha, the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.
Your datastream approach looks solid. For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com < s_penakalap...@yahoo.com> wrote: > Hi All, > > I have one requirement where I need to calculate total amount of > transactions done by each each user in last 1 hour. > Say Customer1 has done 2 transactions one at 11:00am and other one at > 11:20 am. > Customer2 has done 1 transaction one at 10:00 am > Customer3 has done 3 transactions one at 11:20 am, 11:40 am and 11:45 am. > > when ever customer does a transaction then we receive an event in source > topic, we consume the data and need to calculate the total amount spent by > Customer in last 1 hour. > > if I have received customer1 new transaction event at 11:30 am then I need > to calculate the sum of 3 events done by customer1 in last 1 hour (i.e > 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window) > Now say I receive Customer2 new transaction event at 11:30 am then for > this customer I need to consider only one event 11:30 (ignoring the event > at 10:00 am as it does not fall in last 1 hr) > Customer3 new transaction is done at 12:40 pm then for this Customer I > need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall > under last 1 hr. > > Approach I am planning to try: > Every event has the transaction time which I am using as event time to > assign WatermarkStrategy > KeyBy - customerId > SlidingEventTimeWindows of 1 hr > then process all elements using ProcessWindowFunction > > > Kindly suggest the approach I need to follow to achieve the above scenario > using Flink Java /Sql. I am using Flink 1.12.0. > > Regards, > Sunitha >