Hi Anuj Jain,

You need to provide the type parameter when calling
WatermarkStrategy.forBoundedOutOfOrderness like this:

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofMinutes(15))

Regards,
Roman


On Fri, Aug 28, 2020 at 6:49 AM aj <ajainje...@gmail.com> wrote:

>
> I am getting this error when trying to assign watermark in Flink  1.11
>
> *"Cannot resolve method 'withTimestampAssigner(anonymous
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<org.apache.avro.generic.GenericRecord>)'"*
>
> FlinkKafkaConsumer<GenericRecord> bookingFlowConsumer = new 
> FlinkKafkaConsumer(topics,
>     new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>     properties);
>
> bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
>     .withTimestampAssigner(new SerializableTimestampAssigner<GenericRecord>() 
> {
>       @Override
>       public long extractTimestamp(GenericRecord genericRecord, long l) {
>         return (long)genericRecord.get("event_ts");
>       }
>     }));
>
>
> What is wrong with this.
>
> In Flink 1.9 I was using this function and it was working fine
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks<GenericRecord> {
>
>   @Override
>   public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
>     long timestamp = (long) record.get("event_ts");
>     // LOGGER.info("timestamp----", timestamp);
>     return timestamp;
>   }
>
>   @Override
>   public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
>     // simply emit a watermark with every event
>     // LOGGER.info("extractedTimestamp ", extractedTimestamp);
>     return new Watermark(extractedTimestamp);
>   }
> }
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to