Thank you. sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked. However I am still getting only one customer in the process method. i would expect the iterable to provide all customers in the window. or do i have to maintain state. changes for reference: I made the following change, also removed anly lag that i had introduced for experimentation earlier. so trigger looks like: @Override public TriggerResult onElement(Customer customer, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { //LOGGER.info("Max timestamp for customer: " + customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); triggerContext.registerEventTimeTimer(customer.getEventTime()); return TriggerResult.FIRE; } } @Override public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) { return time == timeWindow.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } and *removed latenness* customerStream //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) .window(EventTimeSessionWindows.withGap(Time.seconds(20))) //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) .process(new CustomAggregateFunction()); On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote: > Your source is not setting the timestamp with collectWithTimestamp. I'm > assuming that nothing really moves from the event time's perspective. > > On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote: > >> Yes customer generator is setting the event timestamp correctly like I >> see below. I debugged and found that the events are getting late, so never >> executed. i.e,. in the window operator the method this.isWindowLate( >> actualWindow) is getting executed to false for the rest of the events >> except the first, hence the events are getting skipped, not able to figure >> out where exactly the issue is. >> >> i have removed evictot=r because I don't think I need it yet. >> >> stream looks like >> >> customerStream >> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >> >> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5)) >> .trigger(new EventTimeTrigger()) >> .process(new CustomAggregateFunction()); >> >> >> *Customer generator looks like:* >> >> while (isRunning) { >> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, >> 1000); // that's the event time >> System.out.println("Writing customer: " + c); >> sourceContext.collect(c); >> //sourceContext.emitWatermark(new Watermark(c.getEventTime())); >> Thread.sleep(1000); >> counter++; >> if(counter % 11 == 0) { >> System.out.println("Sleeping for 10 seconds"); >> Thread.sleep(10000); >> } >> } >> >> >> Custom Watermark generator has this: >> >> ..... >> @Override >> public void onEvent(Customer customer, long l, WatermarkOutput >> watermarkOutput) { >> currentMaxTimestamp = Math.max(currentMaxTimestamp, >> customer.getEventTime() ); >> } >> >> @Override >> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >> >> } >> ..... >> >> trigger looks like: >> >> ------ >> >> >> @Override >> public TriggerResult onElement(Customer customer, long l, TimeWindow >> timeWindow, TriggerContext triggerContext) throws Exception { >> if (timeWindow.maxTimestamp() <= >> triggerContext.getCurrentWatermark()) { >> // if the watermark is already past the window fire immediately >> return TriggerResult.FIRE; >> } else { >> LOGGER.info("Max timestamp for customer: " + >> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >> >> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay()); >> return TriggerResult.FIRE; >> } >> } >> >> @Override >> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >> TriggerContext triggerContext) { >> // if (timeWindow.maxTimestamp() > >> triggerContext.getCurrentWatermark()) { >> // >> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); >> // return TriggerResult.CONTINUE; >> // } >> >> return time == timeWindow.maxTimestamp() ? >> TriggerResult.FIRE : >> TriggerResult.CONTINUE; >> } >> >> >> .... >> >> >> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi, >>> >>> Is your CustomerGenerator setting the event timestamp correctly? Are >>> your evictors evicting too early? >>> >>> You can try to add some debug output into the watermark assigner and see >>> if it's indeed progressing as expected. >>> >>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com> >>> wrote: >>> >>>> This seems to be working fine in processing time but doesn't work in >>>> event time. Is there an issue with the way the water mark is defined or do >>>> we need to set up timers? >>>> >>>> Please advise. >>>> >>>> >>>> WORKING: >>>> >>>> customerStream >>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>> .process(new CustomAggregateFunction()); >>>> >>>> >>>> NOT WORKING: >>>> >>>> customerStream >>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>> WaterMarkAssigner())) >>>> .keyBy(Customer::getIdentifier) >>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>> .trigger(EventTimeTrigger.create()) >>>> .evictor(new CustomerEvictor()) >>>> .process(new CustomAggregateFunction()) >>>> .print(); >>>> >>>> >>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote: >>>> >>>>> Adding the code for CustomWatermarkGenerator >>>>> >>>>> ..... >>>>> @Override >>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>> watermarkOutput) { >>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>> customer.getEventTime() ); >>>>> } >>>>> >>>>> @Override >>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>>> >>>>> } >>>>> ..... >>>>> >>>>> >>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Bit of background, I have a stream of customers who have purchased >>>>>> some product, reading these transactions on a KAFKA topic. I want to >>>>>> aggregate the number of products the customer has purchased in a >>>>>> particular >>>>>> duration ( say 10 seconds ) and write to a sink. >>>>>> >>>>>> I am using session windows to achieve the above. >>>>>> >>>>>> For test purposes, i have mocked up a customer stream and executed >>>>>> session windows like below. >>>>>> >>>>>> StreamExecutionEnvironment environment = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> DataStream<Customer> customerStream = environment.addSource( new >>>>>> CustomerGenerator() ); >>>>>> >>>>>> customerStream >>>>>> >>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>>> >>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>> WaterMarkAssigner())) >>>>>> .keyBy(Customer::getIdentifier) >>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>> .trigger(EventTimeTrigger.create()) >>>>>> .evictor(new CustomerEvictor()) >>>>>> .process(new CustomAggregateFunction()) >>>>>> .print(); >>>>>> >>>>>> My watermark assigner looks like: >>>>>> >>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> { >>>>>> static final Logger logger = >>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class); >>>>>> >>>>>> @Override >>>>>> public WatermarkGenerator<Customer> >>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { >>>>>> return new CustomWatermarkGenerator(); >>>>>> } >>>>>> } >>>>>> >>>>>> I notice that the evictor, and aggregation functions are getting called >>>>>> only once for the first customer in the stream. >>>>>> >>>>>> The data stream is generating customers at 1 seconds interval and there >>>>>> are 5 customer keys for which it's generating transactions. >>>>>> >>>>>> Am I doing something wrong with the above? >>>>>> >>>>>> I want to be able to capture the event on each transaction getting added >>>>>> and removed from the window so that I can perform the aggregation. >>>>>> >>>>>> please advise. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>