Thank you very much Aljoscha! 2018-02-23 14:45 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi, > > This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500. > And yes, the workaround is to write an assigner from scratch but you can > start by copying the code of AscendingTimestampExtractor. > > Sorry for the inconvenience. > > -- > Aljoscha > > On 22. Feb 2018, at 12:05, Federico D'Ambrosio <fedex...@gmail.com> wrote: > > Hello everyone, > > I'm consuming from a Kafka topic, on which I'm writing with a > FlinkKafkaProducer, with the timestamp relative flag set to true. > > From what I gather from the documentation [1], Flink is aware of Kafka > Record's timestamp and only the watermark should be set with an appropriate > TimestampExtractor, still I'm failing to understand how to implement it in > the right way. > > I thought that it would be possible to use the already existent > AscendingTimestampExtractor, overriding the extractTimestamp method, but > it's marked final. > > new FlinkKafkaConsumer010[Event](ingestion_topic, new > JSONDeserializationSchema(), consumerConfig) > .setStartFromLatest() > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() { > def extractAscendingTimestamp(element: Event): Long = ??? > }) > > Should I need to implement my own TimestampExtractor (with the appropriate > getCurrentWatermark and extractTimestamp methods) ? > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/connectors/kafka.html#using-kafka- > timestamps-and-flink-event-time-in-kafka-010 > > Thank you, > Federico > > > -- Federico D'Ambrosio