Hi Fabian.
If I use ProcessFunction , I can get it! But I want to know that how
to get Kafka timestamp in like flatmap and map methods of datastream using
scala programming language.
Thanks!
Best
Ben
> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[email protected]> wrote:
>
> Hi Navneeth,
>
> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if
> you configure EventTime for an application [1].
> Since Flink treats record timestamps as meta data, they are not directly
> accessible by most functions. You can implement a ProcessFunction [2] to
> access the timestamp of a record via the ProcessFunction's Context object.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>
> 2018-03-30 7:45 GMT+02:00 Ben Yan <[email protected]
> <mailto:[email protected]>>:
> hi,
> Is that what you mean?
> See :
> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
>
> <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>
>
> Best
> Ben
>
>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> Is there way to get the kafka timestamp in deserialization schema? All
>> records are written to kafka with timestamp and I would like to set that
>> timestamp to every record that is ingested. Thanks.
>
>