Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected (
to the user community )

All other details are here if you need, closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat

On Thu, May 6, 2021 at 1:50 AM Swagat Mishra <swaga...@gmail.com> wrote:

> Hi Arvid,
>
> I sent a separate mail titled - Session Windows - not working as expected
>
> closing this thread.
>
> Please have a look when you have a few minutes, much appreciated.
>
> Regards,
> Swagat
>
>
> On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <swaga...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Tried a small POC to reproduce the behaviour, somehow dont see the
>> process function getting called, am I doing something wrong?
>>
>> customerStream
>>         .keyBy(Customer::getIdentifier)
>>         .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
>>         .process(new CustomAggregateFunction())
>>         .print();
>>
>> the process function looks like below
>>
>> public class CustomAggregateFunction extends ProcessWindowFunction<Customer, 
>> CustomerAggregate, String, TimeWindow> {
>>
>>     @Override
>>     public void process(String key, Context context, Iterable<Customer> 
>> iterable, Collector<CustomerAggregate> collector) throws Exception {
>>         System.out.println("in aggregation");
>>     }
>> }
>>
>> the customer generator
>>
>> public class CustomerGenerator implements SourceFunction<Customer> {
>>
>>     volatile boolean isRunning = true;
>>
>>     private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>>
>>     @Override
>>     public void run(SourceContext<Customer> sourceContext) throws Exception {
>>         int counter = 1;
>>
>>         while (isRunning) {
>>             Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
>> LocalTime.now(), 1000);
>>             System.out.println("Writing customer: " + c);
>>             sourceContext.collect(c);
>>             Thread.sleep(1000);
>>             counter++;
>>         }
>>     }
>>
>>     @Override
>>     public void cancel() {
>>         isRunning = false;
>>     }
>> }
>>
>>
>> Customer object
>>
>> public class Customer {
>>     private String identifier;
>>     private LocalTime eventTime;
>>     private double amount;
>>
>>     public Customer(String identifier, LocalTime eventTime, double amount) {
>>         this.identifier = identifier;
>>         this.amount = amount;
>>         this.eventTime = eventTime;
>>     }
>>
>>     public String getIdentifier() {
>>         return identifier;
>>     }
>>
>>     public LocalTime getEventTime() {
>>         return eventTime;
>>     }
>>
>>     public double getAmount() {
>>         return amount;
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "Customer{" +
>>                 "identifier='" + identifier + '\'' +
>>                 ", eventTime=" + eventTime +
>>                 ", amount=" + amount +
>>                 '}';
>>     }
>> }
>>
>>
>>
>> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Swagat,
>>>
>>> 1. Where the data primarily resides depends on the chosen state backend
>>> [1]. In most cases, it's written to some file with a memory cache. It's
>>> possible to query the state [2] but not with SQL. In fact, it's so basic
>>> that we decided to drop the feature in the future to make room for a more
>>> sophisticated solution based around replicating the state to an external
>>> queryable form but there is nothing specific yet.
>>> 2. It would help if you (re)read the section about state persistence.
>>> [3] Basically, the state is updated on every write access of the process
>>> function. Flink creates a checkpoint of the state periodically and can
>>> recover from these checkpoint. It's also possible to look into these
>>> checkpoint with the state processor API [4].
>>> 3. It's embedded. See above to what happens on failure.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
>>> [4]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <swaga...@gmail.com>
>>> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> On 2 - I was referring to stateful functions as an alternative to
>>>> windows, but in this particular use case, its not fitting in exactly I
>>>> think, though a solution can be built around it.
>>>>
>>>> On the overall approach here what's the right way to use Flink SQL:
>>>>
>>>> Every event has the transaction time which I am using as event time to 
>>>> assign WatermarkStrategy
>>>> KeyBy - customerId
>>>> SlidingEventTimeWindows of 1 hr
>>>> then process all elements using ProcessWindowFunction
>>>>
>>>> Extending above..
>>>>
>>>> For the session window, taking the above example , reiterated below:
>>>>
>>>> Say Customer1 has done 2 transactions one at 11:00am and other one at 
>>>> 11:20 am.
>>>> Customer2 has done 1 transaction one at 10:00 am
>>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>>>
>>>> 1 hour window:
>>>> 9:30AM - 10:30 AM : Customer 2
>>>> 10:30 AM - 11:30 AM : Customer 1, Customer 3
>>>> 11:30 AM - 12:30 PM : Customer 3
>>>>
>>>> Questions - how do we access the state?
>>>>
>>>>    1. Will the process window function write to an in-memory SQL table 
>>>> that does not get flushed to a proper backing database, so all the data 
>>>> stays in-memory -  if yes can that be queried?
>>>>    2. If the process window function writes to a proper backing database, 
>>>> at what point should this happen? Because the API can query the state at 
>>>> any point of time, so the data that was flushed might be state and need 
>>>> recomputation.
>>>>    3. How do you recommend for rock db to be used as a state backend? Is 
>>>> that the embedded rocks db or do you recommend an external implementation. 
>>>> Embedded rocks db state is lost when the container is restarted i guess, 
>>>> so we will have to have an external mechanism for restart/ crash recovery?
>>>>
>>>> Regards,
>>>> Swagat
>>>>
>>>>
>>>>
>>>> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> 1. It always depends on the data volume per user. A million user is
>>>>> not much if you compare it to the biggest Flink installations (Netflix,
>>>>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
>>>>> recommend to use rocksDB state backend. [1]
>>>>>
>>>>> 2. Are you referring to statefun? I'd say that for your use case,
>>>>> Flink is a better fit. Statefun is more suitable when each actor (=user in
>>>>> your case) acts differently depending on the data like in a state machine.
>>>>> In your case, your users should be processed in the same way: Even if the
>>>>> windows are independently opened and closed, every user has only at most
>>>>> one window open at a given event time. You probably also aggregate all 
>>>>> user
>>>>> states more or less in the same way.
>>>>>
>>>>> Or did you refer to processing functions with state? That's certainly
>>>>> possible to implement but it won't be much faster unless you can exploit
>>>>> some specific properties of your application. An example is written in 
>>>>> [2].
>>>>> I'd recommend to first use regular, built-in windows and only switch to
>>>>> custom code if the performance is insufficient. Custom implementations may
>>>>> be faster now, but come with a higher maintenance cost and the built-in
>>>>> windows may be better optimized in future.
>>>>>
>>>>> Lastly if your query is of relational nature, I'd recommend to have a
>>>>> look at Table API/SQL [3]. Unless you really invest a lot of time, you
>>>>> won't be able to write more efficient code than what Table API is
>>>>> generating.
>>>>>
>>>>> [1] https://flink.apache.org/2021/01/18/rocksdb.html
>>>>> [2]
>>>>> https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows
>>>>>
>>>>> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <swaga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>  1. What if there are a very high number of users, like a million
>>>>>> customers won't the service crash? Is it advisable to hold the data in
>>>>>> memory.
>>>>>>
>>>>>> 2. What if state-functions are used to calculate the value ? How will
>>>>>> this approach differ from the one proposed below.
>>>>>>
>>>>>> Regards,
>>>>>> Swagat
>>>>>>
>>>>>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Sunitha,
>>>>>>>
>>>>>>> the approach you are describing sounds like you want to use a
>>>>>>> session window. [1] If you only want to count them if they happen at the
>>>>>>> same hour then, you want to use a tumbling window.
>>>>>>>
>>>>>>> Your datastream approach looks solid.
>>>>>>>
>>>>>>> For SQL, there is also a session (and tumbling) window [2]. You can
>>>>>>> see examples at the bottom of the section.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>>>>>>>
>>>>>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
>>>>>>> s_penakalap...@yahoo.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have one requirement where I need to calculate total amount of
>>>>>>>> transactions done by each each user in last 1 hour.
>>>>>>>> Say Customer1 has done 2 transactions one at 11:00am and other one
>>>>>>>> at 11:20 am.
>>>>>>>> Customer2 has done 1 transaction one at 10:00 am
>>>>>>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and
>>>>>>>> 11:45 am.
>>>>>>>>
>>>>>>>> when ever customer does a transaction then we receive an event in
>>>>>>>> source topic, we consume the data and need to calculate the total 
>>>>>>>> amount
>>>>>>>> spent by Customer in last 1 hour.
>>>>>>>>
>>>>>>>> if I have received customer1 new transaction event at 11:30 am then
>>>>>>>> I need to calculate the sum of 3 events done by customer1 in last 1 
>>>>>>>> hour
>>>>>>>> (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour 
>>>>>>>> window)
>>>>>>>> Now say I receive Customer2 new transaction event at 11:30 am then
>>>>>>>> for this customer I need to consider only one event 11:30 (ignoring the
>>>>>>>> event at  10:00 am  as it does not fall in last 1 hr)
>>>>>>>> Customer3 new transaction is done at 12:40 pm then for this
>>>>>>>> Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) 
>>>>>>>> as
>>>>>>>> all 3 fall under last 1 hr.
>>>>>>>>
>>>>>>>> Approach I am planning to try:
>>>>>>>> Every event has the transaction time which I am using as event time
>>>>>>>> to assign WatermarkStrategy
>>>>>>>> KeyBy - customerId
>>>>>>>> SlidingEventTimeWindows of 1 hr
>>>>>>>> then process all elements using ProcessWindowFunction
>>>>>>>>
>>>>>>>>
>>>>>>>> Kindly suggest the approach I need to follow to achieve the above
>>>>>>>> scenario using Flink Java /Sql. I am using Flink 1.12.0.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Sunitha
>>>>>>>>
>>>>>>>

Reply via email to