Your source is not setting the timestamp with collectWithTimestamp. I'm assuming that nothing really moves from the event time's perspective.
On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote: > Yes customer generator is setting the event timestamp correctly like I see > below. I debugged and found that the events are getting late, so never > executed. i.e,. in the window operator the method this.isWindowLate( > actualWindow) is getting executed to false for the rest of the events > except the first, hence the events are getting skipped, not able to figure > out where exactly the issue is. > > i have removed evictot=r because I don't think I need it yet. > > stream looks like > > customerStream > .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) > .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) > > .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5)) > .trigger(new EventTimeTrigger()) > .process(new CustomAggregateFunction()); > > > *Customer generator looks like:* > > while (isRunning) { > Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, > 1000); // that's the event time > System.out.println("Writing customer: " + c); > sourceContext.collect(c); > //sourceContext.emitWatermark(new Watermark(c.getEventTime())); > Thread.sleep(1000); > counter++; > if(counter % 11 == 0) { > System.out.println("Sleeping for 10 seconds"); > Thread.sleep(10000); > } > } > > > Custom Watermark generator has this: > > ..... > @Override > public void onEvent(Customer customer, long l, WatermarkOutput > watermarkOutput) { > currentMaxTimestamp = Math.max(currentMaxTimestamp, > customer.getEventTime() ); > } > > @Override > public void onPeriodicEmit(WatermarkOutput watermarkOutput) { > watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); > > } > ..... > > trigger looks like: > > ------ > > > @Override > public TriggerResult onElement(Customer customer, long l, TimeWindow > timeWindow, TriggerContext triggerContext) throws Exception { > if (timeWindow.maxTimestamp() <= > triggerContext.getCurrentWatermark()) { > // if the watermark is already past the window fire immediately > return TriggerResult.FIRE; > } else { > LOGGER.info("Max timestamp for customer: " + > customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); > > triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay()); > return TriggerResult.FIRE; > } > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow timeWindow, > TriggerContext triggerContext) { > // if (timeWindow.maxTimestamp() > > triggerContext.getCurrentWatermark()) { > // > triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); > // return TriggerResult.CONTINUE; > // } > > return time == timeWindow.maxTimestamp() ? > TriggerResult.FIRE : > TriggerResult.CONTINUE; > } > > > .... > > > On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi, >> >> Is your CustomerGenerator setting the event timestamp correctly? Are your >> evictors evicting too early? >> >> You can try to add some debug output into the watermark assigner and see >> if it's indeed progressing as expected. >> >> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com> wrote: >> >>> This seems to be working fine in processing time but doesn't work in >>> event time. Is there an issue with the way the water mark is defined or do >>> we need to set up timers? >>> >>> Please advise. >>> >>> >>> WORKING: >>> >>> customerStream >>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>> .process(new CustomAggregateFunction()); >>> >>> >>> NOT WORKING: >>> >>> customerStream >>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>> WaterMarkAssigner())) >>> .keyBy(Customer::getIdentifier) >>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>> .trigger(EventTimeTrigger.create()) >>> .evictor(new CustomerEvictor()) >>> .process(new CustomAggregateFunction()) >>> .print(); >>> >>> >>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote: >>> >>>> Adding the code for CustomWatermarkGenerator >>>> >>>> ..... >>>> @Override >>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>> watermarkOutput) { >>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>> customer.getEventTime() ); >>>> } >>>> >>>> @Override >>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>> >>>> } >>>> ..... >>>> >>>> >>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Bit of background, I have a stream of customers who have purchased >>>>> some product, reading these transactions on a KAFKA topic. I want to >>>>> aggregate the number of products the customer has purchased in a >>>>> particular >>>>> duration ( say 10 seconds ) and write to a sink. >>>>> >>>>> I am using session windows to achieve the above. >>>>> >>>>> For test purposes, i have mocked up a customer stream and executed >>>>> session windows like below. >>>>> >>>>> StreamExecutionEnvironment environment = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStream<Customer> customerStream = environment.addSource( new >>>>> CustomerGenerator() ); >>>>> >>>>> customerStream >>>>> >>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>> WaterMarkAssigner())) >>>>> .keyBy(Customer::getIdentifier) >>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>> .trigger(EventTimeTrigger.create()) >>>>> .evictor(new CustomerEvictor()) >>>>> .process(new CustomAggregateFunction()) >>>>> .print(); >>>>> >>>>> My watermark assigner looks like: >>>>> >>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> { >>>>> static final Logger logger = >>>>> LoggerFactory.getLogger(WaterMarkAssigner.class); >>>>> >>>>> @Override >>>>> public WatermarkGenerator<Customer> >>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { >>>>> return new CustomWatermarkGenerator(); >>>>> } >>>>> } >>>>> >>>>> I notice that the evictor, and aggregation functions are getting called >>>>> only once for the first customer in the stream. >>>>> >>>>> The data stream is generating customers at 1 seconds interval and there >>>>> are 5 customer keys for which it's generating transactions. >>>>> >>>>> Am I doing something wrong with the above? >>>>> >>>>> I want to be able to capture the event on each transaction getting added >>>>> and removed from the window so that I can perform the aggregation. >>>>> >>>>> please advise. >>>>> >>>>> >>>>> >>>>> >>>>>