Hi Jia, The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read the event timestamps from Kafka 10. We don't expose the timestamp through the deserilaization API, because we set it internally in Flink. (there is a "hidden" field with each record containing the event time of the event)
With a custom operator you can access the event time of a record. On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiat...@gmail.com> wrote: > Hi, > > Is there a way to retrieve the timestamps that Kafka associates with each > key-value pair within Flink? I would like to be able to use these as values > within my application flow, and defining them before or after Kafka is not > acceptable for the use case due to the latency involved in sending or > receiving from Kafka. > > It seems that Flink supports Kafka event time (link > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>) > but after a brief trace it seems that KafkaConsumer010 still relies on the > Kafka09Fetcher > <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L137> > for > iterating through each Kafka record and deserializing it. The > KeyedDeserializationSchema api does not seem to have support for including > timestamp as additional metadata (just offset, topic, and partition) so > something such as JSONKeyValueDeserializationSchema will not return the > Kafka-specified timestamp. > > For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka > Connector (1.2.1). > > Thanks, > Jia Teoh >