Hi, Bit of background, I have a stream of customers who have purchased some product, reading these transactions on a KAFKA topic. I want to aggregate the number of products the customer has purchased in a particular duration ( say 10 seconds ) and write to a sink.
I am using session windows to achieve the above. For test purposes, i have mocked up a customer stream and executed session windows like below. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Customer> customerStream = environment.addSource( new CustomerGenerator() ); customerStream //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WaterMarkAssigner())) .keyBy(Customer::getIdentifier) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .trigger(EventTimeTrigger.create()) .evictor(new CustomerEvictor()) .process(new CustomAggregateFunction()) .print(); My watermark assigner looks like: public class WaterMarkAssigner implements WatermarkStrategy<Customer> { static final Logger logger = LoggerFactory.getLogger(WaterMarkAssigner.class); @Override public WatermarkGenerator<Customer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomWatermarkGenerator(); } } I notice that the evictor, and aggregation functions are getting called only once for the first customer in the stream. The data stream is generating customers at 1 seconds interval and there are 5 customer keys for which it's generating transactions. Am I doing something wrong with the above? I want to be able to capture the event on each transaction getting added and removed from the window so that I can perform the aggregation. please advise.