Hello,

I'm getting messages from a kafka stream. The messages are JSON records with a "timestamp" key in the json. This timestamp key contains the time at which the message was generated. Now i'd like if these messages had a delivery delay (eg delay between message generation and arrival in kafka). So i don't want to have the "full" delay (eg difference between generation time and processing time), just de delivery delay.

In my timestamp assigner i get a "long" with the original timestamp as an argument, but i cannot yield an updated record from the timestamp assigner (eg with an extra field "deliveryDelay" or so).

So i guess my only option is to not specify the timestamp/watermark extractor in the env.fromSource, then first mapping the stream to add a lateness field and only after that reassign timestamps/watermarks ... is that right ?

Thanks!

Greetings,
Frank



Reply via email to