thank you i wil have a look at datasteeam.global
is there any other way to maintain state like by using valuestate. On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <ar...@apache.org> wrote: > If you keyby then all direct functions see only the elements with the same > key. So that's the expected behavior and the base of Flink's parallel > processing capabilities. > > If you want to generate a window over all customers, you have to use a > global window. However, that also means that no parallelization can happen, > so I'd discourage that. > > A better way would be to perform as many calculations as possible in the > process function (for example create a customer with buy information > record) and then have a DataStream#global() reshuffle to collect all > aggregated information on one node. > > On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <swaga...@gmail.com> wrote: > >> Thank you. >> >> sourceContext.collectWithTimestamp(c, c.getEventTime()); >> >> Adding this to the source context worked. >> >> However I am still getting only one customer in the process method. i would >> expect the iterable to provide all customers in the window. or do i have to >> maintain state. >> >> >> changes for reference: >> >> I made the following change, also removed anly lag that i had introduced for >> experimentation earlier. >> >> so trigger looks like: >> >> >> @Override >> public TriggerResult onElement(Customer customer, long l, TimeWindow >> timeWindow, TriggerContext triggerContext) throws Exception { >> if (timeWindow.maxTimestamp() <= >> triggerContext.getCurrentWatermark()) { >> // if the watermark is already past the window fire immediately >> return TriggerResult.FIRE; >> } else { >> //LOGGER.info("Max timestamp for customer: " + >> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >> triggerContext.registerEventTimeTimer(customer.getEventTime()); >> return TriggerResult.FIRE; >> >> } >> } >> >> @Override >> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >> TriggerContext triggerContext) { >> return time == timeWindow.maxTimestamp() ? >> TriggerResult.FIRE : >> TriggerResult.CONTINUE; >> } >> >> @Override >> public TriggerResult onProcessingTime(long time, TimeWindow window, >> TriggerContext ctx) throws Exception { >> return TriggerResult.CONTINUE; >> } >> >> @Override >> public void clear(TimeWindow window, TriggerContext ctx) throws >> Exception { >> ctx.deleteEventTimeTimer(window.maxTimestamp()); >> } >> >> @Override >> public boolean canMerge() { >> return true; >> } >> >> and *removed latenness* >> >> customerStream >> >> >> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >> .window(EventTimeSessionWindows.withGap(Time.seconds(20))) >> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >> .process(new CustomAggregateFunction()); >> >> >> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Your source is not setting the timestamp with collectWithTimestamp. I'm >>> assuming that nothing really moves from the event time's perspective. >>> >>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote: >>> >>>> Yes customer generator is setting the event timestamp correctly like I >>>> see below. I debugged and found that the events are getting late, so never >>>> executed. i.e,. in the window operator the method this.isWindowLate( >>>> actualWindow) is getting executed to false for the rest of the events >>>> except the first, hence the events are getting skipped, not able to figure >>>> out where exactly the issue is. >>>> >>>> i have removed evictot=r because I don't think I need it yet. >>>> >>>> stream looks like >>>> >>>> customerStream >>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>> >>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5)) >>>> .trigger(new EventTimeTrigger()) >>>> .process(new CustomAggregateFunction()); >>>> >>>> >>>> *Customer generator looks like:* >>>> >>>> while (isRunning) { >>>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* >>>> LocalTime.now()*, 1000); // that's the event time >>>> System.out.println("Writing customer: " + c); >>>> sourceContext.collect(c); >>>> //sourceContext.emitWatermark(new Watermark(c.getEventTime())); >>>> Thread.sleep(1000); >>>> counter++; >>>> if(counter % 11 == 0) { >>>> System.out.println("Sleeping for 10 seconds"); >>>> Thread.sleep(10000); >>>> } >>>> } >>>> >>>> >>>> Custom Watermark generator has this: >>>> >>>> ..... >>>> @Override >>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>> watermarkOutput) { >>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>> customer.getEventTime() ); >>>> } >>>> >>>> @Override >>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>> >>>> } >>>> ..... >>>> >>>> trigger looks like: >>>> >>>> ------ >>>> >>>> >>>> @Override >>>> public TriggerResult onElement(Customer customer, long l, TimeWindow >>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>> if (timeWindow.maxTimestamp() <= >>>> triggerContext.getCurrentWatermark()) { >>>> // if the watermark is already past the window fire immediately >>>> return TriggerResult.FIRE; >>>> } else { >>>> LOGGER.info("Max timestamp for customer: " + >>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >>>> >>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay()); >>>> return TriggerResult.FIRE; >>>> } >>>> } >>>> >>>> @Override >>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >>>> TriggerContext triggerContext) { >>>> // if (timeWindow.maxTimestamp() > >>>> triggerContext.getCurrentWatermark()) { >>>> // >>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); >>>> // return TriggerResult.CONTINUE; >>>> // } >>>> >>>> return time == timeWindow.maxTimestamp() ? >>>> TriggerResult.FIRE : >>>> TriggerResult.CONTINUE; >>>> } >>>> >>>> >>>> .... >>>> >>>> >>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi, >>>>> >>>>> Is your CustomerGenerator setting the event timestamp correctly? Are >>>>> your evictors evicting too early? >>>>> >>>>> You can try to add some debug output into the watermark assigner and >>>>> see if it's indeed progressing as expected. >>>>> >>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com> >>>>> wrote: >>>>> >>>>>> This seems to be working fine in processing time but doesn't work in >>>>>> event time. Is there an issue with the way the water mark is defined or >>>>>> do >>>>>> we need to set up timers? >>>>>> >>>>>> Please advise. >>>>>> >>>>>> >>>>>> WORKING: >>>>>> >>>>>> customerStream >>>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>>>> .process(new CustomAggregateFunction()); >>>>>> >>>>>> >>>>>> NOT WORKING: >>>>>> >>>>>> customerStream >>>>>> >>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>> WaterMarkAssigner())) >>>>>> .keyBy(Customer::getIdentifier) >>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>> .trigger(EventTimeTrigger.create()) >>>>>> .evictor(new CustomerEvictor()) >>>>>> .process(new CustomAggregateFunction()) >>>>>> .print(); >>>>>> >>>>>> >>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote: >>>>>> >>>>>>> Adding the code for CustomWatermarkGenerator >>>>>>> >>>>>>> ..... >>>>>>> @Override >>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>>>> watermarkOutput) { >>>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>>>> customer.getEventTime() ); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>>>>> >>>>>>> } >>>>>>> ..... >>>>>>> >>>>>>> >>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Bit of background, I have a stream of customers who have purchased >>>>>>>> some product, reading these transactions on a KAFKA topic. I want to >>>>>>>> aggregate the number of products the customer has purchased in a >>>>>>>> particular >>>>>>>> duration ( say 10 seconds ) and write to a sink. >>>>>>>> >>>>>>>> I am using session windows to achieve the above. >>>>>>>> >>>>>>>> For test purposes, i have mocked up a customer stream and executed >>>>>>>> session windows like below. >>>>>>>> >>>>>>>> StreamExecutionEnvironment environment = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> DataStream<Customer> customerStream = environment.addSource( new >>>>>>>> CustomerGenerator() ); >>>>>>>> >>>>>>>> customerStream >>>>>>>> >>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>>>>> >>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>>>> WaterMarkAssigner())) >>>>>>>> .keyBy(Customer::getIdentifier) >>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>> .trigger(EventTimeTrigger.create()) >>>>>>>> .evictor(new CustomerEvictor()) >>>>>>>> .process(new CustomAggregateFunction()) >>>>>>>> .print(); >>>>>>>> >>>>>>>> My watermark assigner looks like: >>>>>>>> >>>>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> { >>>>>>>> static final Logger logger = >>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class); >>>>>>>> >>>>>>>> @Override >>>>>>>> public WatermarkGenerator<Customer> >>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { >>>>>>>> return new CustomWatermarkGenerator(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> I notice that the evictor, and aggregation functions are getting >>>>>>>> called only once for the first customer in the stream. >>>>>>>> >>>>>>>> The data stream is generating customers at 1 seconds interval and >>>>>>>> there are 5 customer keys for which it's generating transactions. >>>>>>>> >>>>>>>> Am I doing something wrong with the above? >>>>>>>> >>>>>>>> I want to be able to capture the event on each transaction getting >>>>>>>> added and removed from the window so that I can perform the >>>>>>>> aggregation. >>>>>>>> >>>>>>>> please advise. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>