@Till For instances where we use withTimestampAssigner() the examples in the docs always use the explicit generic parameter. (See event_timestamps_watermarks.md and streaming_analytics.md). For cases where we don't use withTimestampAssigner() we don't need the extra generic parameter because the compiler can figure it out.
But yes, the Java compiler is not very helpful here... 😅 Best, Aljoscha On 09.11.20 09:35, Till Rohrmann wrote: > Glad to hear it! > > Cheers, > Till > > On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <cavalla...@hotmail.com> > wrote: > >> Hi Till, >> >> That's great! thank you so much!!! I have spent one week on this. I'm so >> relieved! >> >> Cheers >> >> s >> >> >> ------------------------------ >> *From:* Till Rohrmann <trohrm...@apache.org> >> *Sent:* 06 November 2020 17:56 >> *To:* Simone Cavallarin <cavalla...@hotmail.com> >> *Cc:* user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek < >> aljos...@apache.org> >> *Subject:* Re: How to use properly the function: >> withTimestampAssigner((event, timestamp) ->.. >> >> Hi Simone, >> >> The problem is that the Java 1.8 compiler cannot do type inference when >> chaining methods [1]. >> >> The solution would be >> >> WatermarkStrategy<Event> wmStrategy = >> WatermarkStrategy >> .<Event>forMonotonousTimestamps() >> .withTimestampAssigner((event, timestamp) -> { >> return event.getTime(); >> }); >> >> @Aljoscha Krettek <aljos...@apache.org> I think we need to update the >> documentation about it. We have some examples which don't take this into >> account. >> >> [1] >> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ >> >> Cheers, >> Till >> >> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <cavalla...@hotmail.com> >> wrote: >> >> Hi, >> >> I'm taking the timestamp from the event payload that I'm receiving from >> Kafka. >> >> I'm struggling to get the time and I'm confused on how I should use the >> function ".withTimestampAssigner()". I'm receiving an error on event. >> getTime() that is telling me: *"cannot resolve method "Get Time" in >> "Object"* and I really don't understand how I can fix it. My class is >> providing a long so the variable itself should be fine. Any help would be >> really appreciated. >> >> *This is my code:* >> >> * FlinkKafkaConsumer<Event> kafkaData =* >> * new FlinkKafkaConsumer("CorID_0", new >> EventDeserializationSchema(), p);* >> * WatermarkStrategy<Event> wmStrategy =* >> * WatermarkStrategy* >> * .forMonotonousTimestamps()* >> * .withTimestampAssigner((event, timestamp) -> { >> return event.**getTime();* >> * });* >> >> * DataStream<Event> stream = env.addSource(* >> * kafkaData.assignTimestampsAndWatermarks(wmStrategy));* >> >> >> And to give you the idea of the whole project, >> >> *This is the EventDeserializationSchema class:* >> >> *public class EventDeserializationSchema implements >> DeserializationSchema<Event> {* >> >> * private static final long serialVersionUID = 1L;* >> >> >> * private static final CsvSchema schema = CsvSchema.builder()* >> * .addColumn("firstName")* >> * .addColumn("lastName")* >> * .addColumn("age", CsvSchema.ColumnType.NUMBER)* >> * .addColumn("time")* >> * .build();* >> >> * private static final ObjectMapper mapper = new CsvMapper();* >> >> * @Override* >> * public Event deserialize(byte[] message) throws IOException {* >> * return >> mapper.readerFor(Event.class).with(schema).readValue(message);* >> * }* >> >> * @Override* >> * public boolean isEndOfStream(Event nextElement) {* >> * return false;* >> * }* >> >> * @Override* >> * public TypeInformation<Event> getProducedType() {* >> >> * return TypeInformation.of(Event.class);* >> * }* >> *}* >> >> *And this is the Event Class:* >> >> *public class Event implements Serializable {* >> * public String firstName;* >> * public String lastName;* >> * private int age;* >> * public Long time;* >> >> >> >> * public Event() {* >> * }* >> >> * public String getFirstName() {* >> * return firstName;* >> * }* >> >> * public void setFirstName(String firstName) {* >> * this.firstName = firstName;* >> * }* >> >> * public String getLastName() {* >> * return lastName;* >> * }* >> >> * public void setLastName(String lastName) {* >> * this.lastName = lastName;* >> * }* >> >> * public int getAge() {* >> * return age;* >> * }* >> >> * public void setAge(int age) {* >> * this.age = age;* >> * }* >> >> * public long getTime() {* >> * return time;* >> * }* >> >> * public void setTime(String kafkaTime) {* >> * long tn = >> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();* >> * this.time = tn;* >> * }* >> *}* >> >> >> >> >> >> >