Hi Till, That's great! thank you so much!!! I have spent one week on this. I'm so relieved!
Cheers s ________________________________ From: Till Rohrmann <trohrm...@apache.org> Sent: 06 November 2020 17:56 To: Simone Cavallarin <cavalla...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek <aljos...@apache.org> Subject: Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->.. Hi Simone, The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1]. The solution would be WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); @Aljoscha Krettek<mailto:aljos...@apache.org> I think we need to update the documentation about it. We have some examples which don't take this into account. [1] https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ Cheers, Till On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <cavalla...@hotmail.com<mailto:cavalla...@hotmail.com>> wrote: Hi, I'm taking the timestamp from the event payload that I'm receiving from Kafka. I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it. My class is providing a long so the variable itself should be fine. Any help would be really appreciated. This is my code: FlinkKafkaConsumer<Event> kafkaData = new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p); WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); DataStream<Event> stream = env.addSource( kafkaData.assignTimestampsAndWatermarks(wmStrategy)); And to give you the idea of the whole project, This is the EventDeserializationSchema class: public class EventDeserializationSchema implements DeserializationSchema<Event> { private static final long serialVersionUID = 1L; private static final CsvSchema schema = CsvSchema.builder() .addColumn("firstName") .addColumn("lastName") .addColumn("age", CsvSchema.ColumnType.NUMBER) .addColumn("time") .build(); private static final ObjectMapper mapper = new CsvMapper(); @Override public Event deserialize(byte[] message) throws IOException { return mapper.readerFor(Event.class).with(schema).readValue(message); } @Override public boolean isEndOfStream(Event nextElement) { return false; } @Override public TypeInformation<Event> getProducedType() { return TypeInformation.of(Event.class); } } And this is the Event Class: public class Event implements Serializable { public String firstName; public String lastName; private int age; public Long time; public Event() { } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public long getTime() { return time; } public void setTime(String kafkaTime) { long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli(); this.time = tn; } }