Thanks Arvid. very helpful.
On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <ar...@apache.org> wrote: > Hi Swagat, > > 1. Where the data primarily resides depends on the chosen state backend > [1]. In most cases, it's written to some file with a memory cache. It's > possible to query the state [2] but not with SQL. In fact, it's so basic > that we decided to drop the feature in the future to make room for a more > sophisticated solution based around replicating the state to an external > queryable form but there is nothing specific yet. > 2. It would help if you (re)read the section about state persistence. [3] > Basically, the state is updated on every write access of the process > function. Flink creates a checkpoint of the state periodically and can > recover from these checkpoint. It's also possible to look into these > checkpoint with the state processor API [4]. > 3. It's embedded. See above to what happens on failure. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence > [4] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <swaga...@gmail.com> wrote: > >> Hi Arvid, >> >> On 2 - I was referring to stateful functions as an alternative to >> windows, but in this particular use case, its not fitting in exactly I >> think, though a solution can be built around it. >> >> On the overall approach here what's the right way to use Flink SQL: >> >> Every event has the transaction time which I am using as event time to >> assign WatermarkStrategy >> KeyBy - customerId >> SlidingEventTimeWindows of 1 hr >> then process all elements using ProcessWindowFunction >> >> Extending above.. >> >> For the session window, taking the above example , reiterated below: >> >> Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 >> am. >> Customer2 has done 1 transaction one at 10:00 am >> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and 11:45 am. >> >> 1 hour window: >> 9:30AM - 10:30 AM : Customer 2 >> 10:30 AM - 11:30 AM : Customer 1, Customer 3 >> 11:30 AM - 12:30 PM : Customer 3 >> >> Questions - how do we access the state? >> >> 1. Will the process window function write to an in-memory SQL table that >> does not get flushed to a proper backing database, so all the data stays >> in-memory - if yes can that be queried? >> 2. If the process window function writes to a proper backing database, at >> what point should this happen? Because the API can query the state at any >> point of time, so the data that was flushed might be state and need >> recomputation. >> 3. How do you recommend for rock db to be used as a state backend? Is >> that the embedded rocks db or do you recommend an external implementation. >> Embedded rocks db state is lost when the container is restarted i guess, so >> we will have to have an external mechanism for restart/ crash recovery? >> >> Regards, >> Swagat >> >> >> >> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <ar...@apache.org> wrote: >> >>> 1. It always depends on the data volume per user. A million user is not >>> much if you compare it to the biggest Flink installations (Netflix, >>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd >>> recommend to use rocksDB state backend. [1] >>> >>> 2. Are you referring to statefun? I'd say that for your use case, Flink >>> is a better fit. Statefun is more suitable when each actor (=user in your >>> case) acts differently depending on the data like in a state machine. In >>> your case, your users should be processed in the same way: Even if the >>> windows are independently opened and closed, every user has only at most >>> one window open at a given event time. You probably also aggregate all user >>> states more or less in the same way. >>> >>> Or did you refer to processing functions with state? That's certainly >>> possible to implement but it won't be much faster unless you can exploit >>> some specific properties of your application. An example is written in [2]. >>> I'd recommend to first use regular, built-in windows and only switch to >>> custom code if the performance is insufficient. Custom implementations may >>> be faster now, but come with a higher maintenance cost and the built-in >>> windows may be better optimized in future. >>> >>> Lastly if your query is of relational nature, I'd recommend to have a >>> look at Table API/SQL [3]. Unless you really invest a lot of time, you >>> won't be able to write more efficient code than what Table API is >>> generating. >>> >>> [1] https://flink.apache.org/2021/01/18/rocksdb.html >>> [2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows >>> >>> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <swaga...@gmail.com> >>> wrote: >>> >>>> 1. What if there are a very high number of users, like a million >>>> customers won't the service crash? Is it advisable to hold the data in >>>> memory. >>>> >>>> 2. What if state-functions are used to calculate the value ? How will >>>> this approach differ from the one proposed below. >>>> >>>> Regards, >>>> Swagat >>>> >>>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Sunitha, >>>>> >>>>> the approach you are describing sounds like you want to use a session >>>>> window. [1] If you only want to count them if they happen at the same hour >>>>> then, you want to use a tumbling window. >>>>> >>>>> Your datastream approach looks solid. >>>>> >>>>> For SQL, there is also a session (and tumbling) window [2]. You can >>>>> see examples at the bottom of the section. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows >>>>> >>>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com < >>>>> s_penakalap...@yahoo.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have one requirement where I need to calculate total amount of >>>>>> transactions done by each each user in last 1 hour. >>>>>> Say Customer1 has done 2 transactions one at 11:00am and other one at >>>>>> 11:20 am. >>>>>> Customer2 has done 1 transaction one at 10:00 am >>>>>> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and >>>>>> 11:45 am. >>>>>> >>>>>> when ever customer does a transaction then we receive an event in >>>>>> source topic, we consume the data and need to calculate the total amount >>>>>> spent by Customer in last 1 hour. >>>>>> >>>>>> if I have received customer1 new transaction event at 11:30 am then I >>>>>> need to calculate the sum of 3 events done by customer1 in last 1 hour >>>>>> (i.e >>>>>> 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window) >>>>>> Now say I receive Customer2 new transaction event at 11:30 am then >>>>>> for this customer I need to consider only one event 11:30 (ignoring the >>>>>> event at 10:00 am as it does not fall in last 1 hr) >>>>>> Customer3 new transaction is done at 12:40 pm then for this Customer >>>>>> I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 >>>>>> fall >>>>>> under last 1 hr. >>>>>> >>>>>> Approach I am planning to try: >>>>>> Every event has the transaction time which I am using as event time >>>>>> to assign WatermarkStrategy >>>>>> KeyBy - customerId >>>>>> SlidingEventTimeWindows of 1 hr >>>>>> then process all elements using ProcessWindowFunction >>>>>> >>>>>> >>>>>> Kindly suggest the approach I need to follow to achieve the above >>>>>> scenario using Flink Java /Sql. I am using Flink 1.12.0. >>>>>> >>>>>> Regards, >>>>>> Sunitha >>>>>> >>>>>