Hi Arvid, I sent a separate mail titled - Session Windows - not working as expected ( to the user community )
All other details are here if you need, closing this thread. Please have a look when you have a few minutes, much appreciated. Regards, Swagat On Thu, May 6, 2021 at 1:50 AM Swagat Mishra <swaga...@gmail.com> wrote: > Hi Arvid, > > I sent a separate mail titled - Session Windows - not working as expected > > closing this thread. > > Please have a look when you have a few minutes, much appreciated. > > Regards, > Swagat > > > On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <swaga...@gmail.com> wrote: > >> Hi Arvid, >> >> Tried a small POC to reproduce the behaviour, somehow dont see the >> process function getting called, am I doing something wrong? >> >> customerStream >> .keyBy(Customer::getIdentifier) >> .window(EventTimeSessionWindows.withGap(Time.seconds(8))) >> .process(new CustomAggregateFunction()) >> .print(); >> >> the process function looks like below >> >> public class CustomAggregateFunction extends ProcessWindowFunction<Customer, >> CustomerAggregate, String, TimeWindow> { >> >> @Override >> public void process(String key, Context context, Iterable<Customer> >> iterable, Collector<CustomerAggregate> collector) throws Exception { >> System.out.println("in aggregation"); >> } >> } >> >> the customer generator >> >> public class CustomerGenerator implements SourceFunction<Customer> { >> >> volatile boolean isRunning = true; >> >> private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"}; >> >> @Override >> public void run(SourceContext<Customer> sourceContext) throws Exception { >> int counter = 1; >> >> while (isRunning) { >> Customer c = new Customer(CUSTOMER_KEY[counter % 5], >> LocalTime.now(), 1000); >> System.out.println("Writing customer: " + c); >> sourceContext.collect(c); >> Thread.sleep(1000); >> counter++; >> } >> } >> >> @Override >> public void cancel() { >> isRunning = false; >> } >> } >> >> >> Customer object >> >> public class Customer { >> private String identifier; >> private LocalTime eventTime; >> private double amount; >> >> public Customer(String identifier, LocalTime eventTime, double amount) { >> this.identifier = identifier; >> this.amount = amount; >> this.eventTime = eventTime; >> } >> >> public String getIdentifier() { >> return identifier; >> } >> >> public LocalTime getEventTime() { >> return eventTime; >> } >> >> public double getAmount() { >> return amount; >> } >> >> @Override >> public String toString() { >> return "Customer{" + >> "identifier='" + identifier + '\'' + >> ", eventTime=" + eventTime + >> ", amount=" + amount + >> '}'; >> } >> } >> >> >> >> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Swagat, >>> >>> 1. Where the data primarily resides depends on the chosen state backend >>> [1]. In most cases, it's written to some file with a memory cache. It's >>> possible to query the state [2] but not with SQL. In fact, it's so basic >>> that we decided to drop the feature in the future to make room for a more >>> sophisticated solution based around replicating the state to an external >>> queryable form but there is nothing specific yet. >>> 2. It would help if you (re)read the section about state persistence. >>> [3] Basically, the state is updated on every write access of the process >>> function. Flink creates a checkpoint of the state periodically and can >>> recover from these checkpoint. It's also possible to look into these >>> checkpoint with the state processor API [4]. >>> 3. It's embedded. See above to what happens on failure. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence >>> [4] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >>> >>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <swaga...@gmail.com> >>> wrote: >>> >>>> Hi Arvid, >>>> >>>> On 2 - I was referring to stateful functions as an alternative to >>>> windows, but in this particular use case, its not fitting in exactly I >>>> think, though a solution can be built around it. >>>> >>>> On the overall approach here what's the right way to use Flink SQL: >>>> >>>> Every event has the transaction time which I am using as event time to >>>> assign WatermarkStrategy >>>> KeyBy - customerId >>>> SlidingEventTimeWindows of 1 hr >>>> then process all elements using ProcessWindowFunction >>>> >>>> Extending above.. >>>> >>>> For the session window, taking the above example , reiterated below: >>>> >>>> Say Customer1 has done 2 transactions one at 11:00am and other one at >>>> 11:20 am. >>>> Customer2 has done 1 transaction one at 10:00 am >>>> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and 11:45 am. >>>> >>>> 1 hour window: >>>> 9:30AM - 10:30 AM : Customer 2 >>>> 10:30 AM - 11:30 AM : Customer 1, Customer 3 >>>> 11:30 AM - 12:30 PM : Customer 3 >>>> >>>> Questions - how do we access the state? >>>> >>>> 1. Will the process window function write to an in-memory SQL table >>>> that does not get flushed to a proper backing database, so all the data >>>> stays in-memory - if yes can that be queried? >>>> 2. If the process window function writes to a proper backing database, >>>> at what point should this happen? Because the API can query the state at >>>> any point of time, so the data that was flushed might be state and need >>>> recomputation. >>>> 3. How do you recommend for rock db to be used as a state backend? Is >>>> that the embedded rocks db or do you recommend an external implementation. >>>> Embedded rocks db state is lost when the container is restarted i guess, >>>> so we will have to have an external mechanism for restart/ crash recovery? >>>> >>>> Regards, >>>> Swagat >>>> >>>> >>>> >>>> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> 1. It always depends on the data volume per user. A million user is >>>>> not much if you compare it to the biggest Flink installations (Netflix, >>>>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd >>>>> recommend to use rocksDB state backend. [1] >>>>> >>>>> 2. Are you referring to statefun? I'd say that for your use case, >>>>> Flink is a better fit. Statefun is more suitable when each actor (=user in >>>>> your case) acts differently depending on the data like in a state machine. >>>>> In your case, your users should be processed in the same way: Even if the >>>>> windows are independently opened and closed, every user has only at most >>>>> one window open at a given event time. You probably also aggregate all >>>>> user >>>>> states more or less in the same way. >>>>> >>>>> Or did you refer to processing functions with state? That's certainly >>>>> possible to implement but it won't be much faster unless you can exploit >>>>> some specific properties of your application. An example is written in >>>>> [2]. >>>>> I'd recommend to first use regular, built-in windows and only switch to >>>>> custom code if the performance is insufficient. Custom implementations may >>>>> be faster now, but come with a higher maintenance cost and the built-in >>>>> windows may be better optimized in future. >>>>> >>>>> Lastly if your query is of relational nature, I'd recommend to have a >>>>> look at Table API/SQL [3]. Unless you really invest a lot of time, you >>>>> won't be able to write more efficient code than what Table API is >>>>> generating. >>>>> >>>>> [1] https://flink.apache.org/2021/01/18/rocksdb.html >>>>> [2] >>>>> https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html >>>>> [3] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows >>>>> >>>>> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <swaga...@gmail.com> >>>>> wrote: >>>>> >>>>>> 1. What if there are a very high number of users, like a million >>>>>> customers won't the service crash? Is it advisable to hold the data in >>>>>> memory. >>>>>> >>>>>> 2. What if state-functions are used to calculate the value ? How will >>>>>> this approach differ from the one proposed below. >>>>>> >>>>>> Regards, >>>>>> Swagat >>>>>> >>>>>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <ar...@apache.org> wrote: >>>>>> >>>>>>> Hi Sunitha, >>>>>>> >>>>>>> the approach you are describing sounds like you want to use a >>>>>>> session window. [1] If you only want to count them if they happen at the >>>>>>> same hour then, you want to use a tumbling window. >>>>>>> >>>>>>> Your datastream approach looks solid. >>>>>>> >>>>>>> For SQL, there is also a session (and tumbling) window [2]. You can >>>>>>> see examples at the bottom of the section. >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows >>>>>>> >>>>>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com < >>>>>>> s_penakalap...@yahoo.com> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I have one requirement where I need to calculate total amount of >>>>>>>> transactions done by each each user in last 1 hour. >>>>>>>> Say Customer1 has done 2 transactions one at 11:00am and other one >>>>>>>> at 11:20 am. >>>>>>>> Customer2 has done 1 transaction one at 10:00 am >>>>>>>> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and >>>>>>>> 11:45 am. >>>>>>>> >>>>>>>> when ever customer does a transaction then we receive an event in >>>>>>>> source topic, we consume the data and need to calculate the total >>>>>>>> amount >>>>>>>> spent by Customer in last 1 hour. >>>>>>>> >>>>>>>> if I have received customer1 new transaction event at 11:30 am then >>>>>>>> I need to calculate the sum of 3 events done by customer1 in last 1 >>>>>>>> hour >>>>>>>> (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour >>>>>>>> window) >>>>>>>> Now say I receive Customer2 new transaction event at 11:30 am then >>>>>>>> for this customer I need to consider only one event 11:30 (ignoring the >>>>>>>> event at 10:00 am as it does not fall in last 1 hr) >>>>>>>> Customer3 new transaction is done at 12:40 pm then for this >>>>>>>> Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) >>>>>>>> as >>>>>>>> all 3 fall under last 1 hr. >>>>>>>> >>>>>>>> Approach I am planning to try: >>>>>>>> Every event has the transaction time which I am using as event time >>>>>>>> to assign WatermarkStrategy >>>>>>>> KeyBy - customerId >>>>>>>> SlidingEventTimeWindows of 1 hr >>>>>>>> then process all elements using ProcessWindowFunction >>>>>>>> >>>>>>>> >>>>>>>> Kindly suggest the approach I need to follow to achieve the above >>>>>>>> scenario using Flink Java /Sql. I am using Flink 1.12.0. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Sunitha >>>>>>>> >>>>>>>