Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I am able to maintain a list state in process function and aggregate the values, how do i get a notification/event to remove the value from the stored list state. On Thu, May 6, 2021 at 8:47 PM Swagat Mishra wrote: > I meant "Do you recommend the state to be maintained in* Value** State *or > ex

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I meant "Do you recommend the state to be maintained in* Value** State *or external store like elastic?" On Thu, May 6, 2021 at 8:46 PM Swagat Mishra wrote: > I want to aggregate the user activity e.g number of products the user has > purchased in the last 1 hour. > > so - User A (ID = USER-A)

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I want to aggregate the user activity e.g number of products the user has purchased in the last 1 hour. so - User A (ID = USER-A) purchases a1 product at 10:30 and another product at 10:45 AM and another product at 1:30 AM. My API should give 2 products purchased if the API call happens at 11:29

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
I'm not sure what you want to achieve exactly. You can always keyby the values by a constant pseudo-key such that all values will be in the same partition (so instead of using global but with the same effect). Then you can use a process function to maintain the state. Just make sure that your data

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
thank you i wil have a look at datasteeam.global is there any other way to maintain state like by using valuestate. On Thu, 6 May 2021 at 1:26 PM, Arvid Heise wrote: > If you keyby then all direct functions see only the elements with the same > key. So that's the expected behavior and the bas

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
If you keyby then all direct functions see only the elements with the same key. So that's the expected behavior and the base of Flink's parallel processing capabilities. If you want to generate a window over all customers, you have to use a global window. However, that also means that no paralleli

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
Thank you. sourceContext.collectWithTimestamp(c, c.getEventTime()); Adding this to the source context worked. However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. changes for refe

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective. On Thu, May 6, 2021 at 8:58 AM Swagat Mishra wrote: > Yes customer generator is setting the event timestamp correctly like I see > below. I debugged and f

Re: Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
Yes customer generator is setting the event timestamp correctly like I see below. I debugged and found that the events are getting late, so never executed. i.e,. in the window operator the method this.isWindowLate( actualWindow) is getting executed to false for the rest of the events except the fi

Re: Session Windows - not working as expected

2021-05-05 Thread Arvid Heise
Hi, Is your CustomerGenerator setting the event timestamp correctly? Are your evictors evicting too early? You can try to add some debug output into the watermark assigner and see if it's indeed progressing as expected. On Thu, May 6, 2021 at 12:48 AM Swagat Mishra wrote: > This seems to be wo

Re: Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
This seems to be working fine in processing time but doesn't work in event time. Is there an issue with the way the water mark is defined or do we need to set up timers? Please advise. WORKING: customerStream .keyBy((KeySelector) Customer::getIdentifier) .window(ProcessingTimeSe

Re: Session Windows - not working as expected

2021-05-05 Thread Sam
Adding the code for CustomWatermarkGenerator . @Override public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) { currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() ); } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput