I think in the JavaDocs of the WatermarkStrategy we give an incorrect example. I have created an issue [1] to fix the problem.
[1] https://issues.apache.org/jira/browse/FLINK-20156 Cheers, Till On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek <aljos...@apache.org> wrote: > @Till For instances where we use withTimestampAssigner() the examples in > the docs always use the explicit generic parameter. (See > event_timestamps_watermarks.md and streaming_analytics.md). For cases > where we don't use withTimestampAssigner() we don't need the extra > generic parameter because the compiler can figure it out. > > But yes, the Java compiler is not very helpful here... 😅 > > > Best, > Aljoscha > > On 09.11.20 09:35, Till Rohrmann wrote: > > Glad to hear it! > > > > Cheers, > > Till > > > > On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <cavalla...@hotmail.com > > > > wrote: > > > >> Hi Till, > >> > >> That's great! thank you so much!!! I have spent one week on this. I'm so > >> relieved! > >> > >> Cheers > >> > >> s > >> > >> > >> ------------------------------ > >> *From:* Till Rohrmann <trohrm...@apache.org> > >> *Sent:* 06 November 2020 17:56 > >> *To:* Simone Cavallarin <cavalla...@hotmail.com> > >> *Cc:* user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek < > >> aljos...@apache.org> > >> *Subject:* Re: How to use properly the function: > >> withTimestampAssigner((event, timestamp) ->.. > >> > >> Hi Simone, > >> > >> The problem is that the Java 1.8 compiler cannot do type inference when > >> chaining methods [1]. > >> > >> The solution would be > >> > >> WatermarkStrategy<Event> wmStrategy = > >> WatermarkStrategy > >> .<Event>forMonotonousTimestamps() > >> .withTimestampAssigner((event, timestamp) -> { > >> return event.getTime(); > >> }); > >> > >> @Aljoscha Krettek <aljos...@apache.org> I think we need to update the > >> documentation about it. We have some examples which don't take this into > >> account. > >> > >> [1] > >> > https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ > >> > >> Cheers, > >> Till > >> > >> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin < > cavalla...@hotmail.com> > >> wrote: > >> > >> Hi, > >> > >> I'm taking the timestamp from the event payload that I'm receiving from > >> Kafka. > >> > >> I'm struggling to get the time and I'm confused on how I should use the > >> function ".withTimestampAssigner()". I'm receiving an error on event. > >> getTime() that is telling me: *"cannot resolve method "Get Time" in > >> "Object"* and I really don't understand how I can fix it. My class is > >> providing a long so the variable itself should be fine. Any help would > be > >> really appreciated. > >> > >> *This is my code:* > >> > >> * FlinkKafkaConsumer<Event> kafkaData =* > >> * new FlinkKafkaConsumer("CorID_0", new > >> EventDeserializationSchema(), p);* > >> * WatermarkStrategy<Event> wmStrategy =* > >> * WatermarkStrategy* > >> * .forMonotonousTimestamps()* > >> * .withTimestampAssigner((event, timestamp) -> { > >> return event.**getTime();* > >> * });* > >> > >> * DataStream<Event> stream = env.addSource(* > >> * kafkaData.assignTimestampsAndWatermarks(wmStrategy));* > >> > >> > >> And to give you the idea of the whole project, > >> > >> *This is the EventDeserializationSchema class:* > >> > >> *public class EventDeserializationSchema implements > >> DeserializationSchema<Event> {* > >> > >> * private static final long serialVersionUID = 1L;* > >> > >> > >> * private static final CsvSchema schema = CsvSchema.builder()* > >> * .addColumn("firstName")* > >> * .addColumn("lastName")* > >> * .addColumn("age", CsvSchema.ColumnType.NUMBER)* > >> * .addColumn("time")* > >> * .build();* > >> > >> * private static final ObjectMapper mapper = new CsvMapper();* > >> > >> * @Override* > >> * public Event deserialize(byte[] message) throws IOException {* > >> * return > >> mapper.readerFor(Event.class).with(schema).readValue(message);* > >> * }* > >> > >> * @Override* > >> * public boolean isEndOfStream(Event nextElement) {* > >> * return false;* > >> * }* > >> > >> * @Override* > >> * public TypeInformation<Event> getProducedType() {* > >> > >> * return TypeInformation.of(Event.class);* > >> * }* > >> *}* > >> > >> *And this is the Event Class:* > >> > >> *public class Event implements Serializable {* > >> * public String firstName;* > >> * public String lastName;* > >> * private int age;* > >> * public Long time;* > >> > >> > >> > >> * public Event() {* > >> * }* > >> > >> * public String getFirstName() {* > >> * return firstName;* > >> * }* > >> > >> * public void setFirstName(String firstName) {* > >> * this.firstName = firstName;* > >> * }* > >> > >> * public String getLastName() {* > >> * return lastName;* > >> * }* > >> > >> * public void setLastName(String lastName) {* > >> * this.lastName = lastName;* > >> * }* > >> > >> * public int getAge() {* > >> * return age;* > >> * }* > >> > >> * public void setAge(int age) {* > >> * this.age = age;* > >> * }* > >> > >> * public long getTime() {* > >> * return time;* > >> * }* > >> > >> * public void setTime(String kafkaTime) {* > >> * long tn = > >> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();* > >> * this.time = tn;* > >> * }* > >> *}* > >> > >> > >> > >> > >> > >> > > > >