Hello,
I'm getting messages from a kafka stream. The messages are JSON records
with a "timestamp" key in the json. This timestamp key contains the time
at which the message was generated. Now i'd like if these messages had a
delivery delay (eg delay between message generation and arrival in
kafka). So i don't want to have the "full" delay (eg difference between
generation time and processing time), just de delivery delay.
In my timestamp assigner i get a "long" with the original timestamp as
an argument, but i cannot yield an updated record from the timestamp
assigner (eg with an extra field "deliveryDelay" or so).
So i guess my only option is to not specify the timestamp/watermark
extractor in the env.fromSource, then first mapping the stream to add a
lateness field and only after that reassign timestamps/watermarks ... is
that right ?
Thanks!
Greetings,
Frank
- getting "original" ingestion timestamp after usin... Frank Dekervel
-