Thank you very much Aljoscha!

2018-02-23 14:45 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
>
> This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500.
> And yes, the workaround is to write an assigner from scratch but you can
> start by copying the code of AscendingTimestampExtractor.
>
> Sorry for the inconvenience.
>
> --
> Aljoscha
>
> On 22. Feb 2018, at 12:05, Federico D'Ambrosio <fedex...@gmail.com> wrote:
>
> Hello everyone,
>
> I'm consuming from a Kafka topic, on which I'm writing with a
> FlinkKafkaProducer, with the timestamp relative flag set to true.
>
> From what I gather from the documentation [1], Flink is aware of Kafka
> Record's timestamp and only the watermark should be set with an appropriate
> TimestampExtractor, still I'm failing to understand how to implement it in
> the right way.
>
> I thought that it would be possible to use the already existent
> AscendingTimestampExtractor, overriding the extractTimestamp method, but
> it's marked final.
>
> new FlinkKafkaConsumer010[Event](ingestion_topic, new 
> JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>       def extractAscendingTimestamp(element: Event): Long = ???
>     })
>
> Should I need to implement my own TimestampExtractor (with the appropriate
> getCurrentWatermark and extractTimestamp methods) ?
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#using-kafka-
> timestamps-and-flink-event-time-in-kafka-010
>
> Thank you,
> Federico
>
>
>


-- 
Federico D'Ambrosio

Reply via email to