Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected

closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat


On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <swaga...@gmail.com> wrote:

> Hi Arvid,
>
> Tried a small POC to reproduce the behaviour, somehow dont see the process
> function getting called, am I doing something wrong?
>
> customerStream
>         .keyBy(Customer::getIdentifier)
>         .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
>         .process(new CustomAggregateFunction())
>         .print();
>
> the process function looks like below
>
> public class CustomAggregateFunction extends ProcessWindowFunction<Customer, 
> CustomerAggregate, String, TimeWindow> {
>
>     @Override
>     public void process(String key, Context context, Iterable<Customer> 
> iterable, Collector<CustomerAggregate> collector) throws Exception {
>         System.out.println("in aggregation");
>     }
> }
>
> the customer generator
>
> public class CustomerGenerator implements SourceFunction<Customer> {
>
>     volatile boolean isRunning = true;
>
>     private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>
>     @Override
>     public void run(SourceContext<Customer> sourceContext) throws Exception {
>         int counter = 1;
>
>         while (isRunning) {
>             Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
> LocalTime.now(), 1000);
>             System.out.println("Writing customer: " + c);
>             sourceContext.collect(c);
>             Thread.sleep(1000);
>             counter++;
>         }
>     }
>
>     @Override
>     public void cancel() {
>         isRunning = false;
>     }
> }
>
>
> Customer object
>
> public class Customer {
>     private String identifier;
>     private LocalTime eventTime;
>     private double amount;
>
>     public Customer(String identifier, LocalTime eventTime, double amount) {
>         this.identifier = identifier;
>         this.amount = amount;
>         this.eventTime = eventTime;
>     }
>
>     public String getIdentifier() {
>         return identifier;
>     }
>
>     public LocalTime getEventTime() {
>         return eventTime;
>     }
>
>     public double getAmount() {
>         return amount;
>     }
>
>     @Override
>     public String toString() {
>         return "Customer{" +
>                 "identifier='" + identifier + '\'' +
>                 ", eventTime=" + eventTime +
>                 ", amount=" + amount +
>                 '}';
>     }
> }
>
>
>
> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Swagat,
>>
>> 1. Where the data primarily resides depends on the chosen state backend
>> [1]. In most cases, it's written to some file with a memory cache. It's
>> possible to query the state [2] but not with SQL. In fact, it's so basic
>> that we decided to drop the feature in the future to make room for a more
>> sophisticated solution based around replicating the state to an external
>> queryable form but there is nothing specific yet.
>> 2. It would help if you (re)read the section about state persistence. [3]
>> Basically, the state is updated on every write access of the process
>> function. Flink creates a checkpoint of the state periodically and can
>> recover from these checkpoint. It's also possible to look into these
>> checkpoint with the state processor API [4].
>> 3. It's embedded. See above to what happens on failure.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
>> [4]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <swaga...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> On 2 - I was referring to stateful functions as an alternative to
>>> windows, but in this particular use case, its not fitting in exactly I
>>> think, though a solution can be built around it.
>>>
>>> On the overall approach here what's the right way to use Flink SQL:
>>>
>>> Every event has the transaction time which I am using as event time to 
>>> assign WatermarkStrategy
>>> KeyBy - customerId
>>> SlidingEventTimeWindows of 1 hr
>>> then process all elements using ProcessWindowFunction
>>>
>>> Extending above..
>>>
>>> For the session window, taking the above example , reiterated below:
>>>
>>> Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 
>>> am.
>>> Customer2 has done 1 transaction one at 10:00 am
>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>>
>>> 1 hour window:
>>> 9:30AM - 10:30 AM : Customer 2
>>> 10:30 AM - 11:30 AM : Customer 1, Customer 3
>>> 11:30 AM - 12:30 PM : Customer 3
>>>
>>> Questions - how do we access the state?
>>>
>>>    1. Will the process window function write to an in-memory SQL table that 
>>> does not get flushed to a proper backing database, so all the data stays 
>>> in-memory -  if yes can that be queried?
>>>    2. If the process window function writes to a proper backing database, 
>>> at what point should this happen? Because the API can query the state at 
>>> any point of time, so the data that was flushed might be state and need 
>>> recomputation.
>>>    3. How do you recommend for rock db to be used as a state backend? Is 
>>> that the embedded rocks db or do you recommend an external implementation. 
>>> Embedded rocks db state is lost when the container is restarted i guess, so 
>>> we will have to have an external mechanism for restart/ crash recovery?
>>>
>>> Regards,
>>> Swagat
>>>
>>>
>>>
>>> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> 1. It always depends on the data volume per user. A million user is not
>>>> much if you compare it to the biggest Flink installations (Netflix,
>>>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
>>>> recommend to use rocksDB state backend. [1]
>>>>
>>>> 2. Are you referring to statefun? I'd say that for your use case, Flink
>>>> is a better fit. Statefun is more suitable when each actor (=user in your
>>>> case) acts differently depending on the data like in a state machine. In
>>>> your case, your users should be processed in the same way: Even if the
>>>> windows are independently opened and closed, every user has only at most
>>>> one window open at a given event time. You probably also aggregate all user
>>>> states more or less in the same way.
>>>>
>>>> Or did you refer to processing functions with state? That's certainly
>>>> possible to implement but it won't be much faster unless you can exploit
>>>> some specific properties of your application. An example is written in [2].
>>>> I'd recommend to first use regular, built-in windows and only switch to
>>>> custom code if the performance is insufficient. Custom implementations may
>>>> be faster now, but come with a higher maintenance cost and the built-in
>>>> windows may be better optimized in future.
>>>>
>>>> Lastly if your query is of relational nature, I'd recommend to have a
>>>> look at Table API/SQL [3]. Unless you really invest a lot of time, you
>>>> won't be able to write more efficient code than what Table API is
>>>> generating.
>>>>
>>>> [1] https://flink.apache.org/2021/01/18/rocksdb.html
>>>> [2]
>>>> https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows
>>>>
>>>> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <swaga...@gmail.com>
>>>> wrote:
>>>>
>>>>>  1. What if there are a very high number of users, like a million
>>>>> customers won't the service crash? Is it advisable to hold the data in
>>>>> memory.
>>>>>
>>>>> 2. What if state-functions are used to calculate the value ? How will
>>>>> this approach differ from the one proposed below.
>>>>>
>>>>> Regards,
>>>>> Swagat
>>>>>
>>>>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Sunitha,
>>>>>>
>>>>>> the approach you are describing sounds like you want to use a session
>>>>>> window. [1] If you only want to count them if they happen at the same 
>>>>>> hour
>>>>>> then, you want to use a tumbling window.
>>>>>>
>>>>>> Your datastream approach looks solid.
>>>>>>
>>>>>> For SQL, there is also a session (and tumbling) window [2]. You can
>>>>>> see examples at the bottom of the section.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>>>>>>
>>>>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
>>>>>> s_penakalap...@yahoo.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have one requirement where I need to calculate total amount of
>>>>>>> transactions done by each each user in last 1 hour.
>>>>>>> Say Customer1 has done 2 transactions one at 11:00am and other one
>>>>>>> at 11:20 am.
>>>>>>> Customer2 has done 1 transaction one at 10:00 am
>>>>>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and
>>>>>>> 11:45 am.
>>>>>>>
>>>>>>> when ever customer does a transaction then we receive an event in
>>>>>>> source topic, we consume the data and need to calculate the total amount
>>>>>>> spent by Customer in last 1 hour.
>>>>>>>
>>>>>>> if I have received customer1 new transaction event at 11:30 am then
>>>>>>> I need to calculate the sum of 3 events done by customer1 in last 1 hour
>>>>>>> (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour 
>>>>>>> window)
>>>>>>> Now say I receive Customer2 new transaction event at 11:30 am then
>>>>>>> for this customer I need to consider only one event 11:30 (ignoring the
>>>>>>> event at  10:00 am  as it does not fall in last 1 hr)
>>>>>>> Customer3 new transaction is done at 12:40 pm then for this Customer
>>>>>>> I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 
>>>>>>> fall
>>>>>>> under last 1 hr.
>>>>>>>
>>>>>>> Approach I am planning to try:
>>>>>>> Every event has the transaction time which I am using as event time
>>>>>>> to assign WatermarkStrategy
>>>>>>> KeyBy - customerId
>>>>>>> SlidingEventTimeWindows of 1 hr
>>>>>>> then process all elements using ProcessWindowFunction
>>>>>>>
>>>>>>>
>>>>>>> Kindly suggest the approach I need to follow to achieve the above
>>>>>>> scenario using Flink Java /Sql. I am using Flink 1.12.0.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Sunitha
>>>>>>>
>>>>>>

Reply via email to