We have a use case where multiple topics are streamed to hdfsand we would want to created buckets based on ingestion time ( the time the event were pushed to kafka ). Our producers to kafka will set that the event time
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 suggests that the the "previousElementTimeStamp" will provide that timestamp provided "EventTime" characteristic is set. It also provides for the element. In out case the element will expose setIngestionTIme(long time) method. Is the element in this method public long extractTimestamp(Long element, long previousElementTimestamp) passed by reference and can it be safely ( loss lessly ) mutated for downstream operators ? That said there is another place where that record time stamp is available. https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141 Is it possible to change the signature of the https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46 to add record timestamp as the last argument ? Regards, Vishal