Hi Anuj Jain, You need to provide the type parameter when calling WatermarkStrategy.forBoundedOutOfOrderness like this:
bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofMinutes(15)) Regards, Roman On Fri, Aug 28, 2020 at 6:49 AM aj <ajainje...@gmail.com> wrote: > > I am getting this error when trying to assign watermark in Flink 1.11 > > *"Cannot resolve method 'withTimestampAssigner(anonymous > org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<org.apache.avro.generic.GenericRecord>)'"* > > FlinkKafkaConsumer<GenericRecord> bookingFlowConsumer = new > FlinkKafkaConsumer(topics, > new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > properties); > > bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15)) > .withTimestampAssigner(new SerializableTimestampAssigner<GenericRecord>() > { > @Override > public long extractTimestamp(GenericRecord genericRecord, long l) { > return (long)genericRecord.get("event_ts"); > } > })); > > > What is wrong with this. > > In Flink 1.9 I was using this function and it was working fine > > public static class SessionAssigner implements > AssignerWithPunctuatedWatermarks<GenericRecord> { > > @Override > public long extractTimestamp(GenericRecord record, long > previousElementTimestamp) { > long timestamp = (long) record.get("event_ts"); > // LOGGER.info("timestamp----", timestamp); > return timestamp; > } > > @Override > public Watermark checkAndGetNextWatermark(GenericRecord record, long > extractedTimestamp) { > // simply emit a watermark with every event > // LOGGER.info("extractedTimestamp ", extractedTimestamp); > return new Watermark(extractedTimestamp); > } > } > > > -- > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >