I think in the JavaDocs of the WatermarkStrategy we give an incorrect
example. I have created an issue [1] to fix the problem.

[1] https://issues.apache.org/jira/browse/FLINK-20156

Cheers,
Till

On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> @Till For instances where we use withTimestampAssigner() the examples in
> the docs always use the explicit generic parameter. (See
> event_timestamps_watermarks.md and streaming_analytics.md). For cases
> where we don't use withTimestampAssigner() we don't need the extra
> generic parameter because the compiler can figure it out.
>
> But yes, the Java compiler is not very helpful here... 😅
>
>
> Best,
> Aljoscha
>
> On 09.11.20 09:35, Till Rohrmann wrote:
> > Glad to hear it!
> >
> > Cheers,
> > Till
> >
> > On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <cavalla...@hotmail.com
> >
> > wrote:
> >
> >> Hi Till,
> >>
> >> That's great! thank you so much!!! I have spent one week on this. I'm so
> >> relieved!
> >>
> >> Cheers
> >>
> >> s
> >>
> >>
> >> ------------------------------
> >> *From:* Till Rohrmann <trohrm...@apache.org>
> >> *Sent:* 06 November 2020 17:56
> >> *To:* Simone Cavallarin <cavalla...@hotmail.com>
> >> *Cc:* user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek <
> >> aljos...@apache.org>
> >> *Subject:* Re: How to use properly the function:
> >> withTimestampAssigner((event, timestamp) ->..
> >>
> >> Hi Simone,
> >>
> >> The problem is that the Java 1.8 compiler cannot do type inference when
> >> chaining methods [1].
> >>
> >> The solution would be
> >>
> >> WatermarkStrategy<Event> wmStrategy =
> >>                  WatermarkStrategy
> >>                          .<Event>forMonotonousTimestamps()
> >>                          .withTimestampAssigner((event, timestamp) -> {
> >> return event.getTime();
> >>                          });
> >>
> >> @Aljoscha Krettek <aljos...@apache.org> I think we need to update the
> >> documentation about it. We have some examples which don't take this into
> >> account.
> >>
> >> [1]
> >>
> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <
> cavalla...@hotmail.com>
> >> wrote:
> >>
> >> Hi,
> >>
> >> I'm taking the timestamp from the event payload that I'm receiving from
> >> Kafka.
> >>
> >> I'm struggling to get the time and I'm confused on how I should use the
> >> function ".withTimestampAssigner()". I'm receiving an error on event.
> >> getTime() that is telling me: *"cannot resolve method "Get Time" in
> >> "Object"* and I really don't understand how I can fix it.  My class is
> >> providing a long so the variable itself should be fine. Any help would
> be
> >> really appreciated.
> >>
> >> *This is my code:*
> >>
> >> *    FlinkKafkaConsumer<Event> kafkaData =*
> >> *                new FlinkKafkaConsumer("CorID_0", new
> >> EventDeserializationSchema(), p);*
> >> *        WatermarkStrategy<Event> wmStrategy =*
> >> *                WatermarkStrategy*
> >> *                        .forMonotonousTimestamps()*
> >> *                        .withTimestampAssigner((event, timestamp) -> {
> >> return event.**getTime();*
> >> *                        });*
> >>
> >> *        DataStream<Event> stream = env.addSource(*
> >> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
> >>
> >>
> >> And to give you the idea of the whole project,
> >>
> >> *This is the EventDeserializationSchema class:*
> >>
> >> *public class EventDeserializationSchema implements
> >> DeserializationSchema<Event> {*
> >>
> >> *    private static final long serialVersionUID = 1L;*
> >>
> >>
> >> *    private static final CsvSchema schema = CsvSchema.builder()*
> >> *            .addColumn("firstName")*
> >> *            .addColumn("lastName")*
> >> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> >> *            .addColumn("time")*
> >> *            .build();*
> >>
> >> *    private static final ObjectMapper mapper = new CsvMapper();*
> >>
> >> *    @Override*
> >> *    public Event deserialize(byte[] message) throws IOException {*
> >> *        return
> >> mapper.readerFor(Event.class).with(schema).readValue(message);*
> >> *    }*
> >>
> >> *    @Override*
> >> *    public boolean isEndOfStream(Event nextElement) {*
> >> *        return false;*
> >> *    }*
> >>
> >> *    @Override*
> >> *    public TypeInformation<Event> getProducedType() {*
> >>
> >> *        return TypeInformation.of(Event.class);*
> >> *    }*
> >> *}*
> >>
> >> *And this is the Event Class:*
> >>
> >> *public class Event implements Serializable {*
> >> *    public String firstName;*
> >> *    public String lastName;*
> >> *    private int age;*
> >> *    public Long time;*
> >>
> >>
> >>
> >> *    public Event() {*
> >> *    }*
> >>
> >> *    public String getFirstName() {*
> >> *        return firstName;*
> >> *    }*
> >>
> >> *    public void setFirstName(String firstName) {*
> >> *        this.firstName = firstName;*
> >> *    }*
> >>
> >> *    public String getLastName() {*
> >> *        return lastName;*
> >> *    }*
> >>
> >> *    public void setLastName(String lastName) {*
> >> *        this.lastName = lastName;*
> >> *    }*
> >>
> >> *    public int getAge() {*
> >> *        return age;*
> >> *    }*
> >>
> >> *    public void setAge(int age) {*
> >> *        this.age = age;*
> >> *    }*
> >>
> >> *    public long getTime() {*
> >> *        return time;*
> >> *    }*
> >>
> >> *    public void setTime(String kafkaTime) {*
> >> *        long tn =
> >> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> >> *        this.time = tn;*
> >> *    }*
> >> *}*
> >>
> >>
> >>
> >>
> >>
> >>
> >
>
>

Reply via email to