Hi Till,

That's great! thank you so much!!! I have spent one week on this. I'm so 
relieved!

Cheers

s


________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: 06 November 2020 17:56
To: Simone Cavallarin <cavalla...@hotmail.com>
Cc: user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek 
<aljos...@apache.org>
Subject: Re: How to use properly the function: withTimestampAssigner((event, 
timestamp) ->..

Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when 
chaining methods [1].

The solution would be

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return 
event.getTime();
                        });

@Aljoscha Krettek<mailto:aljos...@apache.org> I think we need to update the 
documentation about it. We have some examples which don't take this into 
account.

[1] 
https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/

Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin 
<cavalla...@hotmail.com<mailto:cavalla...@hotmail.com>> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the 
function ".withTimestampAssigner()". I'm receiving an error on event.getTime() 
that is telling me: "cannot resolve method "Get Time" in "Object" and I really 
don't understand how I can fix it.  My class is providing a long so the 
variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new 
EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return 
event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> 
{

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}





Reply via email to