Hi Frank, I'm not sure exactly what you are trying to accomplish, but yes. In the TimestampAssigner you can only return what should be the new timestamp for the given record.
If you want to use "ingestion time" - "true even time" as some kind of delay metric, you will indeed need to have both of them calculated somewhere. You could: 1. As you described, use first ingestion time assigner, a mapper function to extract this to a separate field, re assign the true event time, and calculate the delay 2. Or you could simply assign the correct event time and in a simple single mapper, chained directly to the source, use for example `System.currentTimeMillis() - eventTime` to calculate this delay in a single step. After all, that's more or less what Flink is doing to calculate the ingestion time [1] Best, Piotrek [1] https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IngestionTimeAssigner.java śr., 16 lut 2022 o 09:46 Frank Dekervel <fr...@kapernikov.com> napisał(a): > Hello, > > I'm getting messages from a kafka stream. The messages are JSON records > with a "timestamp" key in the json. This timestamp key contains the time > at which the message was generated. Now i'd like if these messages had a > delivery delay (eg delay between message generation and arrival in > kafka). So i don't want to have the "full" delay (eg difference between > generation time and processing time), just de delivery delay. > > In my timestamp assigner i get a "long" with the original timestamp as > an argument, but i cannot yield an updated record from the timestamp > assigner (eg with an extra field "deliveryDelay" or so). > > So i guess my only option is to not specify the timestamp/watermark > extractor in the env.fromSource, then first mapping the stream to add a > lateness field and only after that reassign timestamps/watermarks ... is > that right ? > > Thanks! > > Greetings, > Frank > > > >