An Operator like below will expose lag between current time and event time
passing the operator. I add that after the source and before the sink, and
calculate sink_delay - source_delay in grafana. would that be a generic
solution to solve the problem?
```
public class EmitLagOperator<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T> {
private transient long delay;
public EmitLagOperator() {
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
long now = getProcessingTimeService().getCurrentProcessingTime();
delay = now - element.getTimestamp();
output.collect(element);
}
@Override
public void open() throws Exception {
super.open();
getRuntimeContext()
.getMetricGroup()
.gauge("delay", new Gauge<Long>() {
@Override
public Long getValue() {
return delay;
}
});
}
}
```
On Wed, Apr 1, 2020 at 7:59 PM zoudan <[email protected]> wrote:
> Hi,
> I think we may add latency metric for each operator, which can reflect
> consumption ability of each operator.
>
> Best,
> Dan Zou
>
>
> 在 2020年3月30日,18:19,Guanghui Zhang <[email protected]> 写道:
>
> Hi.
> At flink source connector, you can send $source_current_time - $event_time
> metric.
> In the meantime, at flink sink connector, you can send $sink_current_time
> - $event_time metric.
> Then you use $sink_current_time - $event_time - ($source_current_time -
> $event_time) = $sink_current_time - $source_current_time as the latency of
> end to end。
>
> Oscar Westra van Holthe - Kind <[email protected]> 于2020年3月30日周一
> 下午5:15写道:
>
>> On Mon, 30 Mar 2020 at 05:08, Lu Niu <[email protected]> wrote:
>>
>>> $current_processing - $event_time works for event time. How about
>>> processing time? Is there a good way to measure the latency?
>>>
>>
>> To measure latency you'll need some way to determine the time spent
>> between the start and end of your pipeline.
>>
>> To measure latency when using processing time, you'll need to partially
>> use ingestion time. That is, you'll need to add the 'current' processing
>> time as soon as messages are ingested.
>>
>> With it, you can then use the $current_processing - $ingest_time
>> solution that was already mentioned.
>>
>> Kind regards,
>> Oscar
>>
>> --
>> Oscar Westra van Holthe - Kind
>>
>
>