I meant "Do you recommend the state to be maintained in* Value** State *or external store like elastic?"
On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <swaga...@gmail.com> wrote: > I want to aggregate the user activity e.g number of products the user has > purchased in the last 1 hour. > > so - User A (ID = USER-A) purchases a1 product at 10:30 and another > product at 10:45 AM and another product at 1:30 AM. > > My API should give 2 products purchased if the API call happens at 11:29 > AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM > > The API will access data persisted from the flink streaming output. > > As of now I am doing keyby on (ID = USER-A) . > > Do I have to maintain my own own calculated state within the process > window function. Is the process window function shared across all keys or > one instance per key. Do you recommend the state to be maintained in State > or elastic? > > Also, if I change the processing to processing time instead of event time, > the aggregation is happening. Any reason why flink could not provide event > time aggregations like the processing time aggregation. > > > > On Thu, May 6, 2021 at 7:11 PM Arvid Heise <ar...@apache.org> wrote: > >> I'm not sure what you want to achieve exactly. >> >> You can always keyby the values by a constant pseudo-key such that all >> values will be in the same partition (so instead of using global but with >> the same effect). Then you can use a process function to maintain the >> state. Just make sure that your data volume is low enough as this part is >> not parallelizable by definition. >> >> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <swaga...@gmail.com> wrote: >> >>> thank you >>> >>> i wil have a look at datasteeam.global >>> >>> is there any other way to maintain state like by using valuestate. >>> >>> >>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <ar...@apache.org> wrote: >>> >>>> If you keyby then all direct functions see only the elements with the >>>> same key. So that's the expected behavior and the base of Flink's parallel >>>> processing capabilities. >>>> >>>> If you want to generate a window over all customers, you have to use a >>>> global window. However, that also means that no parallelization can happen, >>>> so I'd discourage that. >>>> >>>> A better way would be to perform as many calculations as possible in >>>> the process function (for example create a customer with buy information >>>> record) and then have a DataStream#global() reshuffle to collect all >>>> aggregated information on one node. >>>> >>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <swaga...@gmail.com> >>>> wrote: >>>> >>>>> Thank you. >>>>> >>>>> sourceContext.collectWithTimestamp(c, c.getEventTime()); >>>>> >>>>> Adding this to the source context worked. >>>>> >>>>> However I am still getting only one customer in the process method. i >>>>> would expect the iterable to provide all customers in the window. or do i >>>>> have to maintain state. >>>>> >>>>> >>>>> changes for reference: >>>>> >>>>> I made the following change, also removed anly lag that i had introduced >>>>> for experimentation earlier. >>>>> >>>>> so trigger looks like: >>>>> >>>>> >>>>> @Override >>>>> public TriggerResult onElement(Customer customer, long l, TimeWindow >>>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>>> if (timeWindow.maxTimestamp() <= >>>>> triggerContext.getCurrentWatermark()) { >>>>> // if the watermark is already past the window fire >>>>> immediately >>>>> return TriggerResult.FIRE; >>>>> } else { >>>>> //LOGGER.info("Max timestamp for customer: " + >>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >>>>> >>>>> triggerContext.registerEventTimeTimer(customer.getEventTime()); >>>>> return TriggerResult.FIRE; >>>>> >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >>>>> TriggerContext triggerContext) { >>>>> return time == timeWindow.maxTimestamp() ? >>>>> TriggerResult.FIRE : >>>>> TriggerResult.CONTINUE; >>>>> } >>>>> >>>>> @Override >>>>> public TriggerResult onProcessingTime(long time, TimeWindow window, >>>>> TriggerContext ctx) throws Exception { >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> >>>>> @Override >>>>> public void clear(TimeWindow window, TriggerContext ctx) throws >>>>> Exception { >>>>> ctx.deleteEventTimeTimer(window.maxTimestamp()); >>>>> } >>>>> >>>>> @Override >>>>> public boolean canMerge() { >>>>> return true; >>>>> } >>>>> >>>>> and *removed latenness* >>>>> >>>>> customerStream >>>>> >>>>> >>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(20))) >>>>> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>>> .process(new CustomAggregateFunction()); >>>>> >>>>> >>>>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Your source is not setting the timestamp with collectWithTimestamp. >>>>>> I'm assuming that nothing really moves from the event time's perspective. >>>>>> >>>>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Yes customer generator is setting the event timestamp correctly like >>>>>>> I see below. I debugged and found that the events are getting late, so >>>>>>> never executed. i.e,. in the window operator the method this >>>>>>> .isWindowLate(actualWindow) is getting executed to false for the >>>>>>> rest of the events except the first, hence the events are getting >>>>>>> skipped, >>>>>>> not able to figure out where exactly the issue is. >>>>>>> >>>>>>> i have removed evictot=r because I don't think I need it yet. >>>>>>> >>>>>>> stream looks like >>>>>>> >>>>>>> customerStream >>>>>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >>>>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>>>>> >>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5)) >>>>>>> .trigger(new EventTimeTrigger()) >>>>>>> .process(new CustomAggregateFunction()); >>>>>>> >>>>>>> >>>>>>> *Customer generator looks like:* >>>>>>> >>>>>>> while (isRunning) { >>>>>>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* >>>>>>> LocalTime.now()*, 1000); // that's the event time >>>>>>> System.out.println("Writing customer: " + c); >>>>>>> sourceContext.collect(c); >>>>>>> //sourceContext.emitWatermark(new Watermark(c.getEventTime())); >>>>>>> Thread.sleep(1000); >>>>>>> counter++; >>>>>>> if(counter % 11 == 0) { >>>>>>> System.out.println("Sleeping for 10 seconds"); >>>>>>> Thread.sleep(10000); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Custom Watermark generator has this: >>>>>>> >>>>>>> ..... >>>>>>> @Override >>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>>>> watermarkOutput) { >>>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>>>> customer.getEventTime() ); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>>>>> >>>>>>> } >>>>>>> ..... >>>>>>> >>>>>>> trigger looks like: >>>>>>> >>>>>>> ------ >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> public TriggerResult onElement(Customer customer, long l, >>>>>>> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { >>>>>>> if (timeWindow.maxTimestamp() <= >>>>>>> triggerContext.getCurrentWatermark()) { >>>>>>> // if the watermark is already past the window fire >>>>>>> immediately >>>>>>> return TriggerResult.FIRE; >>>>>>> } else { >>>>>>> LOGGER.info("Max timestamp for customer: " + >>>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >>>>>>> >>>>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay()); >>>>>>> return TriggerResult.FIRE; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >>>>>>> TriggerContext triggerContext) { >>>>>>> // if (timeWindow.maxTimestamp() > >>>>>>> triggerContext.getCurrentWatermark()) { >>>>>>> // >>>>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); >>>>>>> // return TriggerResult.CONTINUE; >>>>>>> // } >>>>>>> >>>>>>> return time == timeWindow.maxTimestamp() ? >>>>>>> TriggerResult.FIRE : >>>>>>> TriggerResult.CONTINUE; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> .... >>>>>>> >>>>>>> >>>>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Is your CustomerGenerator setting the event timestamp correctly? >>>>>>>> Are your evictors evicting too early? >>>>>>>> >>>>>>>> You can try to add some debug output into the watermark assigner >>>>>>>> and see if it's indeed progressing as expected. >>>>>>>> >>>>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This seems to be working fine in processing time but doesn't work >>>>>>>>> in event time. Is there an issue with the way the water mark is >>>>>>>>> defined or >>>>>>>>> do we need to set up timers? >>>>>>>>> >>>>>>>>> Please advise. >>>>>>>>> >>>>>>>>> >>>>>>>>> WORKING: >>>>>>>>> >>>>>>>>> customerStream >>>>>>>>> .keyBy((KeySelector<Customer, String>) >>>>>>>>> Customer::getIdentifier) >>>>>>>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>>>>>>> .process(new CustomAggregateFunction()); >>>>>>>>> >>>>>>>>> >>>>>>>>> NOT WORKING: >>>>>>>>> >>>>>>>>> customerStream >>>>>>>>> >>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>>>>> WaterMarkAssigner())) >>>>>>>>> .keyBy(Customer::getIdentifier) >>>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>> .trigger(EventTimeTrigger.create()) >>>>>>>>> .evictor(new CustomerEvictor()) >>>>>>>>> .process(new CustomAggregateFunction()) >>>>>>>>> .print(); >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Adding the code for CustomWatermarkGenerator >>>>>>>>>> >>>>>>>>>> ..... >>>>>>>>>> @Override >>>>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>>>>>>> watermarkOutput) { >>>>>>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>>>>>>> customer.getEventTime() ); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>>>>>>> watermarkOutput.emitWatermark(new >>>>>>>>>> Watermark(currentMaxTimestamp)); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> ..... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Bit of background, I have a stream of customers who have >>>>>>>>>>> purchased some product, reading these transactions on a KAFKA >>>>>>>>>>> topic. I want >>>>>>>>>>> to aggregate the number of products the customer has purchased in a >>>>>>>>>>> particular duration ( say 10 seconds ) and write to a sink. >>>>>>>>>>> >>>>>>>>>>> I am using session windows to achieve the above. >>>>>>>>>>> >>>>>>>>>>> For test purposes, i have mocked up a customer stream and >>>>>>>>>>> executed session windows like below. >>>>>>>>>>> >>>>>>>>>>> StreamExecutionEnvironment environment = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new >>>>>>>>>>> CustomerGenerator() ); >>>>>>>>>>> >>>>>>>>>>> customerStream >>>>>>>>>>> >>>>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>>>>>>>> >>>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>>>>>>> WaterMarkAssigner())) >>>>>>>>>>> .keyBy(Customer::getIdentifier) >>>>>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>>>> .trigger(EventTimeTrigger.create()) >>>>>>>>>>> .evictor(new CustomerEvictor()) >>>>>>>>>>> .process(new CustomAggregateFunction()) >>>>>>>>>>> .print(); >>>>>>>>>>> >>>>>>>>>>> My watermark assigner looks like: >>>>>>>>>>> >>>>>>>>>>> public class WaterMarkAssigner implements >>>>>>>>>>> WatermarkStrategy<Customer> { >>>>>>>>>>> static final Logger logger = >>>>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class); >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public WatermarkGenerator<Customer> >>>>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context >>>>>>>>>>> context) { >>>>>>>>>>> return new CustomWatermarkGenerator(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> I notice that the evictor, and aggregation functions are getting >>>>>>>>>>> called only once for the first customer in the stream. >>>>>>>>>>> >>>>>>>>>>> The data stream is generating customers at 1 seconds interval and >>>>>>>>>>> there are 5 customer keys for which it's generating transactions. >>>>>>>>>>> >>>>>>>>>>> Am I doing something wrong with the above? >>>>>>>>>>> >>>>>>>>>>> I want to be able to capture the event on each transaction getting >>>>>>>>>>> added and removed from the window so that I can perform the >>>>>>>>>>> aggregation. >>>>>>>>>>> >>>>>>>>>>> please advise. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>