An Operator like below will expose lag between current time and event time passing the operator. I add that after the source and before the sink, and calculate sink_delay - source_delay in grafana. would that be a generic solution to solve the problem? ``` public class EmitLagOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
private transient long delay; public EmitLagOperator() { chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<T> element) throws Exception { long now = getProcessingTimeService().getCurrentProcessingTime(); delay = now - element.getTimestamp(); output.collect(element); } @Override public void open() throws Exception { super.open(); getRuntimeContext() .getMetricGroup() .gauge("delay", new Gauge<Long>() { @Override public Long getValue() { return delay; } }); } } ``` On Wed, Apr 1, 2020 at 7:59 PM zoudan <zoud...@163.com> wrote: > Hi, > I think we may add latency metric for each operator, which can reflect > consumption ability of each operator. > > Best, > Dan Zou > > > 在 2020年3月30日,18:19,Guanghui Zhang <beggingh...@gmail.com> 写道: > > Hi. > At flink source connector, you can send $source_current_time - $event_time > metric. > In the meantime, at flink sink connector, you can send $sink_current_time > - $event_time metric. > Then you use $sink_current_time - $event_time - ($source_current_time - > $event_time) = $sink_current_time - $source_current_time as the latency of > end to end。 > > Oscar Westra van Holthe - Kind <os...@westravanholthe.nl> 于2020年3月30日周一 > 下午5:15写道: > >> On Mon, 30 Mar 2020 at 05:08, Lu Niu <qqib...@gmail.com> wrote: >> >>> $current_processing - $event_time works for event time. How about >>> processing time? Is there a good way to measure the latency? >>> >> >> To measure latency you'll need some way to determine the time spent >> between the start and end of your pipeline. >> >> To measure latency when using processing time, you'll need to partially >> use ingestion time. That is, you'll need to add the 'current' processing >> time as soon as messages are ingested. >> >> With it, you can then use the $current_processing - $ingest_time >> solution that was already mentioned. >> >> Kind regards, >> Oscar >> >> -- >> Oscar Westra van Holthe - Kind >> > >