We have a use case where multiple topics are streamed to hdfsand we would
want to created buckets based on ingestion time ( the time the event were
pushed to kafka ). Our producers to kafka will set that the event time

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

suggests that the the "previousElementTimeStamp" will provide that
timestamp provided "EventTime" characteristic is set. It also provides for
the element. In out case the element will expose setIngestionTIme(long
time) method. Is the element in this method

public long extractTimestamp(Long element, long previousElementTimestamp)

 passed by reference and can it be safely ( loss lessly ) mutated for
downstream operators ?


That said there is another place where that record time stamp is available.

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141

Is it possible to change the signature of the

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46

to add record timestamp as the last argument ?

Regards,

Vishal

Reply via email to