I am getting this error when trying to assign watermark in Flink  1.11

*"Cannot resolve method 'withTimestampAssigner(anonymous
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<org.apache.avro.generic.GenericRecord>)'"*

FlinkKafkaConsumer<GenericRecord> bookingFlowConsumer = new
FlinkKafkaConsumer(topics,
    new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
    properties);

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
    .withTimestampAssigner(new SerializableTimestampAssigner<GenericRecord>() {
      @Override
      public long extractTimestamp(GenericRecord genericRecord, long l) {
        return (long)genericRecord.get("event_ts");
      }
    }));


What is wrong with this.

In Flink 1.9 I was using this function and it was working fine

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks<GenericRecord> {

  @Override
  public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
    long timestamp = (long) record.get("event_ts");
    // LOGGER.info("timestamp----", timestamp);
    return timestamp;
  }

  @Override
  public Watermark checkAndGetNextWatermark(GenericRecord record, long
extractedTimestamp) {
    // simply emit a watermark with every event
    // LOGGER.info("extractedTimestamp ", extractedTimestamp);
    return new Watermark(extractedTimestamp);
  }
}


-- 
Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to