Hi Arvid, I sent a separate mail titled - Session Windows - not working as expected
closing this thread. Please have a look when you have a few minutes, much appreciated. Regards, Swagat On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <swaga...@gmail.com> wrote: > Hi Arvid, > > Tried a small POC to reproduce the behaviour, somehow dont see the process > function getting called, am I doing something wrong? > > customerStream > .keyBy(Customer::getIdentifier) > .window(EventTimeSessionWindows.withGap(Time.seconds(8))) > .process(new CustomAggregateFunction()) > .print(); > > the process function looks like below > > public class CustomAggregateFunction extends ProcessWindowFunction<Customer, > CustomerAggregate, String, TimeWindow> { > > @Override > public void process(String key, Context context, Iterable<Customer> > iterable, Collector<CustomerAggregate> collector) throws Exception { > System.out.println("in aggregation"); > } > } > > the customer generator > > public class CustomerGenerator implements SourceFunction<Customer> { > > volatile boolean isRunning = true; > > private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"}; > > @Override > public void run(SourceContext<Customer> sourceContext) throws Exception { > int counter = 1; > > while (isRunning) { > Customer c = new Customer(CUSTOMER_KEY[counter % 5], > LocalTime.now(), 1000); > System.out.println("Writing customer: " + c); > sourceContext.collect(c); > Thread.sleep(1000); > counter++; > } > } > > @Override > public void cancel() { > isRunning = false; > } > } > > > Customer object > > public class Customer { > private String identifier; > private LocalTime eventTime; > private double amount; > > public Customer(String identifier, LocalTime eventTime, double amount) { > this.identifier = identifier; > this.amount = amount; > this.eventTime = eventTime; > } > > public String getIdentifier() { > return identifier; > } > > public LocalTime getEventTime() { > return eventTime; > } > > public double getAmount() { > return amount; > } > > @Override > public String toString() { > return "Customer{" + > "identifier='" + identifier + '\'' + > ", eventTime=" + eventTime + > ", amount=" + amount + > '}'; > } > } > > > > On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Swagat, >> >> 1. Where the data primarily resides depends on the chosen state backend >> [1]. In most cases, it's written to some file with a memory cache. It's >> possible to query the state [2] but not with SQL. In fact, it's so basic >> that we decided to drop the feature in the future to make room for a more >> sophisticated solution based around replicating the state to an external >> queryable form but there is nothing specific yet. >> 2. It would help if you (re)read the section about state persistence. [3] >> Basically, the state is updated on every write access of the process >> function. Flink creates a checkpoint of the state periodically and can >> recover from these checkpoint. It's also possible to look into these >> checkpoint with the state processor API [4]. >> 3. It's embedded. See above to what happens on failure. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence >> [4] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <swaga...@gmail.com> >> wrote: >> >>> Hi Arvid, >>> >>> On 2 - I was referring to stateful functions as an alternative to >>> windows, but in this particular use case, its not fitting in exactly I >>> think, though a solution can be built around it. >>> >>> On the overall approach here what's the right way to use Flink SQL: >>> >>> Every event has the transaction time which I am using as event time to >>> assign WatermarkStrategy >>> KeyBy - customerId >>> SlidingEventTimeWindows of 1 hr >>> then process all elements using ProcessWindowFunction >>> >>> Extending above.. >>> >>> For the session window, taking the above example , reiterated below: >>> >>> Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 >>> am. >>> Customer2 has done 1 transaction one at 10:00 am >>> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and 11:45 am. >>> >>> 1 hour window: >>> 9:30AM - 10:30 AM : Customer 2 >>> 10:30 AM - 11:30 AM : Customer 1, Customer 3 >>> 11:30 AM - 12:30 PM : Customer 3 >>> >>> Questions - how do we access the state? >>> >>> 1. Will the process window function write to an in-memory SQL table that >>> does not get flushed to a proper backing database, so all the data stays >>> in-memory - if yes can that be queried? >>> 2. If the process window function writes to a proper backing database, >>> at what point should this happen? Because the API can query the state at >>> any point of time, so the data that was flushed might be state and need >>> recomputation. >>> 3. How do you recommend for rock db to be used as a state backend? Is >>> that the embedded rocks db or do you recommend an external implementation. >>> Embedded rocks db state is lost when the container is restarted i guess, so >>> we will have to have an external mechanism for restart/ crash recovery? >>> >>> Regards, >>> Swagat >>> >>> >>> >>> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <ar...@apache.org> wrote: >>> >>>> 1. It always depends on the data volume per user. A million user is not >>>> much if you compare it to the biggest Flink installations (Netflix, >>>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd >>>> recommend to use rocksDB state backend. [1] >>>> >>>> 2. Are you referring to statefun? I'd say that for your use case, Flink >>>> is a better fit. Statefun is more suitable when each actor (=user in your >>>> case) acts differently depending on the data like in a state machine. In >>>> your case, your users should be processed in the same way: Even if the >>>> windows are independently opened and closed, every user has only at most >>>> one window open at a given event time. You probably also aggregate all user >>>> states more or less in the same way. >>>> >>>> Or did you refer to processing functions with state? That's certainly >>>> possible to implement but it won't be much faster unless you can exploit >>>> some specific properties of your application. An example is written in [2]. >>>> I'd recommend to first use regular, built-in windows and only switch to >>>> custom code if the performance is insufficient. Custom implementations may >>>> be faster now, but come with a higher maintenance cost and the built-in >>>> windows may be better optimized in future. >>>> >>>> Lastly if your query is of relational nature, I'd recommend to have a >>>> look at Table API/SQL [3]. Unless you really invest a lot of time, you >>>> won't be able to write more efficient code than what Table API is >>>> generating. >>>> >>>> [1] https://flink.apache.org/2021/01/18/rocksdb.html >>>> [2] >>>> https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows >>>> >>>> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <swaga...@gmail.com> >>>> wrote: >>>> >>>>> 1. What if there are a very high number of users, like a million >>>>> customers won't the service crash? Is it advisable to hold the data in >>>>> memory. >>>>> >>>>> 2. What if state-functions are used to calculate the value ? How will >>>>> this approach differ from the one proposed below. >>>>> >>>>> Regards, >>>>> Swagat >>>>> >>>>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Hi Sunitha, >>>>>> >>>>>> the approach you are describing sounds like you want to use a session >>>>>> window. [1] If you only want to count them if they happen at the same >>>>>> hour >>>>>> then, you want to use a tumbling window. >>>>>> >>>>>> Your datastream approach looks solid. >>>>>> >>>>>> For SQL, there is also a session (and tumbling) window [2]. You can >>>>>> see examples at the bottom of the section. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows >>>>>> >>>>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com < >>>>>> s_penakalap...@yahoo.com> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I have one requirement where I need to calculate total amount of >>>>>>> transactions done by each each user in last 1 hour. >>>>>>> Say Customer1 has done 2 transactions one at 11:00am and other one >>>>>>> at 11:20 am. >>>>>>> Customer2 has done 1 transaction one at 10:00 am >>>>>>> Customer3 has done 3 transactions one at 11:20 am, 11:40 am and >>>>>>> 11:45 am. >>>>>>> >>>>>>> when ever customer does a transaction then we receive an event in >>>>>>> source topic, we consume the data and need to calculate the total amount >>>>>>> spent by Customer in last 1 hour. >>>>>>> >>>>>>> if I have received customer1 new transaction event at 11:30 am then >>>>>>> I need to calculate the sum of 3 events done by customer1 in last 1 hour >>>>>>> (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour >>>>>>> window) >>>>>>> Now say I receive Customer2 new transaction event at 11:30 am then >>>>>>> for this customer I need to consider only one event 11:30 (ignoring the >>>>>>> event at 10:00 am as it does not fall in last 1 hr) >>>>>>> Customer3 new transaction is done at 12:40 pm then for this Customer >>>>>>> I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 >>>>>>> fall >>>>>>> under last 1 hr. >>>>>>> >>>>>>> Approach I am planning to try: >>>>>>> Every event has the transaction time which I am using as event time >>>>>>> to assign WatermarkStrategy >>>>>>> KeyBy - customerId >>>>>>> SlidingEventTimeWindows of 1 hr >>>>>>> then process all elements using ProcessWindowFunction >>>>>>> >>>>>>> >>>>>>> Kindly suggest the approach I need to follow to achieve the above >>>>>>> scenario using Flink Java /Sql. I am using Flink 1.12.0. >>>>>>> >>>>>>> Regards, >>>>>>> Sunitha >>>>>>> >>>>>>