Adding the code for CustomWatermarkGenerator ..... @Override public void onEvent(Customer customer, long l, WatermarkOutput watermarkOutput) { currentMaxTimestamp = Math.max(currentMaxTimestamp, customer.getEventTime() ); }
@Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); } ..... On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> wrote: > Hi, > > Bit of background, I have a stream of customers who have purchased some > product, reading these transactions on a KAFKA topic. I want to aggregate > the number of products the customer has purchased in a particular duration > ( say 10 seconds ) and write to a sink. > > I am using session windows to achieve the above. > > For test purposes, i have mocked up a customer stream and executed > session windows like below. > > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Customer> customerStream = environment.addSource( new > CustomerGenerator() ); > > customerStream > > //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) > .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new > WaterMarkAssigner())) > .keyBy(Customer::getIdentifier) > .window(EventTimeSessionWindows.withGap(Time.seconds(5))) > .trigger(EventTimeTrigger.create()) > .evictor(new CustomerEvictor()) > .process(new CustomAggregateFunction()) > .print(); > > My watermark assigner looks like: > > public class WaterMarkAssigner implements WatermarkStrategy<Customer> { > static final Logger logger = > LoggerFactory.getLogger(WaterMarkAssigner.class); > > @Override > public WatermarkGenerator<Customer> > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { > return new CustomWatermarkGenerator(); > } > } > > I notice that the evictor, and aggregation functions are getting called only > once for the first customer in the stream. > > The data stream is generating customers at 1 seconds interval and there are 5 > customer keys for which it's generating transactions. > > Am I doing something wrong with the above? > > I want to be able to capture the event on each transaction getting added and > removed from the window so that I can perform the aggregation. > > please advise. > > > > >