Adding the code for CustomWatermarkGenerator

.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
    currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
    watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

}
.....


On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> wrote:

> Hi,
>
> Bit of background, I have a stream of customers who have purchased some
> product, reading these transactions on a KAFKA topic. I want to aggregate
> the number of products the customer has purchased in a particular duration
> ( say 10 seconds ) and write to a sink.
>
> I am using session windows to achieve the above.
>
> For test purposes, i have mocked  up a customer stream and executed
> session windows like below.
>
> StreamExecutionEnvironment environment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Customer> customerStream = environment.addSource( new 
> CustomerGenerator() );
>
> customerStream
>         
> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
> WaterMarkAssigner()))
>         .keyBy(Customer::getIdentifier)
>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>         .trigger(EventTimeTrigger.create())
>         .evictor(new CustomerEvictor())
>         .process(new CustomAggregateFunction())
>         .print();
>
> My watermark assigner looks like:
>
> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>     static final Logger logger = 
> LoggerFactory.getLogger(WaterMarkAssigner.class);
>
>     @Override
>     public WatermarkGenerator<Customer> 
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>         return new CustomWatermarkGenerator();
>     }
> }
>
> I notice that the evictor, and aggregation functions are getting called only 
> once for the first customer in the stream.
>
> The data stream is generating customers at 1 seconds interval and there are 5 
> customer keys for which it's generating transactions.
>
> Am I doing something wrong with the above?
>
> I want to be able to capture the event on each transaction getting added and 
> removed from the window so that I can perform the aggregation.
>
> please advise.
>
>
>
>
>

Reply via email to