Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a
FlinkKafkaProducer, with the timestamp relative flag set to true.

>From what I gather from the documentation [1], Flink is aware of Kafka
Record's timestamp and only the watermark should be set with an appropriate
TimestampExtractor, still I'm failing to understand how to implement it in
the right way.

I thought that it would be possible to use the already existent
AscendingTimestampExtractor, overriding the extractTimestamp method, but
it's marked final.

new FlinkKafkaConsumer010[Event](ingestion_topic, new
JSONDeserializationSchema(), consumerConfig)
  .setStartFromLatest()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
      def extractAscendingTimestamp(element: Event): Long = ???
    })

Should I need to implement my own TimestampExtractor (with the appropriate
getCurrentWatermark and extractTimestamp methods) ?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010


Thank you,
Federico

Reply via email to