Hi Jia,

Actually just realized you can access the timestamp of records via the more 
powerful `ProcessFunction` [1].
That’ll be a bit easier to use than implementing your own custom operator, 
which is quite low-level.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiat...@gmail.com) wrote:

Hi Gordon,

The timestamps are required for application logic. Thank you for clarifying the 
custom operators - seems I mistakenly thought of the functions that are passed 
to the operators rather than the operators themselves. AbstractStreamOperator 
and the other classes you mentioned seem like exactly what I'm looking for, so 
I will take a look into implementing a custom one for my use case.

Thank you,
Jia

On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Jia,

How exactly do you want to use the Kafka timestamps? Do you want to access them 
and alter them with new values as the record timestamp? Or do you want to use 
them for some application logic in your functions?

If its the former, you should be able to do that by using timestamp / watermark 
extractors. They come with an interface that exposes the current timestamp of 
the record. For Kafka 0.10, that timestamp would be the Kafka record’s 
timestamp if it hasn’t been explicitly assigned any other timestamp yet.

If its the latter, then I think currently you have to use custom operators as 
Robert mentioned.
Custom operators are classes that extend the `AbstractStreamOperator` base 
class as one of `OneInputStreamOperator` or `TwoInputStreamOperator` interfaces.
You can take a look at the basic `StreamMap` or `StreamFlatMap` classes for an 
example, which are the underlying operators for the map and flatMap functions.

At the operator level, you’ll have access to a `StreamRecord` in the 
`processElement` function which wraps the record value (which you get when 
implementing functions) as well as the internal timestamp that comes with the 
record.

Cheers,
Gordon


On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiat...@gmail.com) wrote:

Hi Robert,

Thanks for the reply. I ended up implementing an extension of the Kafka fetcher 
and consumer so that the deserialization API can include the timestamp field, 
which is sufficient for my specific use case. I can share the code if desired 
but it seems like it's an intentional design decision not to expose the 
timestamp in the deserialization API.

I noticed you mentioned that I could use a custom operator to access the record 
event time. Could you elaborate on what you mean by "operator"? I initially 
thought that referred to DataStream.map/reduce/etc, but none of those functions 
provide arguments that can be used to extract the embedded timestamp. 

Thanks,
Jia Teoh

On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetz...@apache.org> wrote:
Hi Jia,

The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is 
extensible / pluggable so that also the Kafka 0.9 Fetcher can read the event 
timestamps from Kafka 10.
We don't expose the timestamp through the deserilaization API, because we set 
it internally in Flink. (there is a "hidden" field with each record containing 
the event time of the event)

With a custom operator you can access the event time of a record.

On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiat...@gmail.com> wrote:
Hi,

Is there a way to retrieve the timestamps that Kafka associates with each 
key-value pair within Flink? I would like to be able to use these as values 
within my application flow, and defining them before or after Kafka is not 
acceptable for the use case due to the latency involved in sending or receiving 
from Kafka.

It seems that Flink supports Kafka event time (link) but after a brief trace it 
seems that KafkaConsumer010 still relies on the Kafka09Fetcher for iterating 
through each Kafka record and deserializing it. The KeyedDeserializationSchema 
api does not seem to have support for including timestamp as additional 
metadata (just offset, topic, and partition) so something such as 
JSONKeyValueDeserializationSchema will not return the Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka 
Connector (1.2.1).

Thanks,
Jia Teoh



Reply via email to