Hi everyone, I have implemented a way to measure latency in a DataStream (I hope): I'm consuming a Kafka topic and I'm union'ing the resulting stream with a custom source that emits a (machine-local) timestamp every 1000ms (using currentTimeMillis). On the consuming end I'm distinguishing between the Kafka events and the timestamps. When encountering a timestamp, I take the difference of the processing machine's local time and the timestamp found in the stream, expecting a positive difference (with the processing machine's timestamp being larger than the timestamp found in the stream). However, the opposite is the case. Now I am wondering about when events are actually processed.
Union the Stream from Kafka+my custom source, batching them in 10s windows (which is what I do), I expect 10 timestamps with ascending values and a rough gap of 1000ms in the stream: https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68 On the receiving end I again take the currentTimeMillis in my fold function, expecting the resulting value to be larger (most of the time) than the timestamps encountered in the stream: https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53 The system clocks are in sync up to 1ms. Maybe I am not clear about when certain timestamps are created (i.e. when the UDFs are invoked) or how windows are processed. Any advice is greatly appreciated, also alternative approaches to calculating latency. I'm on Flink 0.10.2 by the way. Thanks in advance for the help! Robert -- My GPG Key ID: 336E2680