Hi,

Bit of background, I have a stream of customers who have purchased some
product, reading these transactions on a KAFKA topic. I want to aggregate
the number of products the customer has purchased in a particular duration
( say 10 seconds ) and write to a sink.

I am using session windows to achieve the above.

For test purposes, i have mocked  up a customer stream and executed session
windows like below.

StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new
CustomerGenerator() );

customerStream
        
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
        .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
        .keyBy(Customer::getIdentifier)
        .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
        .trigger(EventTimeTrigger.create())
        .evictor(new CustomerEvictor())
        .process(new CustomAggregateFunction())
        .print();

My watermark assigner looks like:

public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
    static final Logger logger =
LoggerFactory.getLogger(WaterMarkAssigner.class);

    @Override
    public WatermarkGenerator<Customer>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new CustomWatermarkGenerator();
    }
}

I notice that the evictor, and aggregation functions are getting
called only once for the first customer in the stream.

The data stream is generating customers at 1 seconds interval and
there are 5 customer keys for which it's generating transactions.

Am I doing something wrong with the above?

I want to be able to capture the event on each transaction getting
added and removed from the window so that I can perform the
aggregation.

please advise.

Reply via email to