Hi, I'm taking the timestamp from the event payload that I'm receiving from Kafka.
I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it. My class is providing a long so the variable itself should be fine. Any help would be really appreciated. This is my code: FlinkKafkaConsumer<Event> kafkaData = new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p); WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); DataStream<Event> stream = env.addSource( kafkaData.assignTimestampsAndWatermarks(wmStrategy)); And to give you the idea of the whole project, This is the EventDeserializationSchema class: public class EventDeserializationSchema implements DeserializationSchema<Event> { private static final long serialVersionUID = 1L; private static final CsvSchema schema = CsvSchema.builder() .addColumn("firstName") .addColumn("lastName") .addColumn("age", CsvSchema.ColumnType.NUMBER) .addColumn("time") .build(); private static final ObjectMapper mapper = new CsvMapper(); @Override public Event deserialize(byte[] message) throws IOException { return mapper.readerFor(Event.class).with(schema).readValue(message); } @Override public boolean isEndOfStream(Event nextElement) { return false; } @Override public TypeInformation<Event> getProducedType() { return TypeInformation.of(Event.class); } } And this is the Event Class: public class Event implements Serializable { public String firstName; public String lastName; private int age; public Long time; public Event() { } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public long getTime() { return time; } public void setTime(String kafkaTime) { long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli(); this.time = tn; } }