Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-17 Thread Jia Teoh
I'll take a look into the ProcessFunction, thanks for the suggestion. -Jia On Wed, May 17, 2017 at 12:33 AM, Tzu-Li (Gordon) Tai wrote: > Hi Jia, > > Actually just realized you can access the timestamp of records via the > more powerful `ProcessFunction` [1]. > That’ll be a bit easier to use th

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-17 Thread Tzu-Li (Gordon) Tai
Hi Jia, Actually just realized you can access the timestamp of records via the more powerful `ProcessFunction` [1]. That’ll be a bit easier to use than implementing your own custom operator, which is quite low-level. Cheers, Gordon [1]  https://ci.apache.org/projects/flink/flink-docs-release-1

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-16 Thread Jia Teoh
Hi Gordon, The timestamps are required for application logic. Thank you for clarifying the custom operators - seems I mistakenly thought of the functions that are passed to the operators rather than the operators themselves. AbstractStreamOperator and the other classes you mentioned seem like exac

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-16 Thread Tzu-Li (Gordon) Tai
Hi Jia, How exactly do you want to use the Kafka timestamps? Do you want to access them and alter them with new values as the record timestamp? Or do you want to use them for some application logic in your functions? If its the former, you should be able to do that by using timestamp / watermar

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-16 Thread Jia Teoh
Hi Robert, Thanks for the reply. I ended up implementing an extension of the Kafka fetcher and consumer so that the deserialization API can include the timestamp field, which is sufficient for my specific use case. I can share the code if desired but it seems like it's an intentional design decisi

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-12 Thread Robert Metzger
Hi Jia, The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read the event timestamps from Kafka 10. We don't expose the timestamp through the deserilaization API, because we set it internally in Flink. (there i

Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-11 Thread Jia Teoh
Hi, Is there a way to retrieve the timestamps that Kafka associates with each key-value pair within Flink? I would like to be able to use these as values within my application flow, and defining them before or after Kafka is not acceptable for the use case due to the latency involved in sending or