An Operator like below will expose lag between current time and event time
passing the operator. I add that after the source and before the sink, and
calculate sink_delay - source_delay in grafana. would that be a generic
solution to solve the problem?
```
public class EmitLagOperator<T> extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {

  private transient long delay;

  public EmitLagOperator() {
    chainingStrategy = ChainingStrategy.ALWAYS;
  }

  @Override
  public void processElement(StreamRecord<T> element) throws Exception {
    long now = getProcessingTimeService().getCurrentProcessingTime();
    delay = now - element.getTimestamp();
    output.collect(element);
  }

  @Override
  public void open() throws Exception {
    super.open();
    getRuntimeContext()
        .getMetricGroup()
        .gauge("delay", new Gauge<Long>() {
          @Override
          public Long getValue() {
            return delay;
          }
        });
  }
}
```

On Wed, Apr 1, 2020 at 7:59 PM zoudan <zoud...@163.com> wrote:

> Hi,
> I think we may add latency metric for each operator, which can reflect
> consumption ability of each operator.
>
> Best,
> Dan Zou
>
>
> 在 2020年3月30日,18:19,Guanghui Zhang <beggingh...@gmail.com> 写道:
>
> Hi.
> At flink source connector, you can send $source_current_time - $event_time
> metric.
> In the meantime, at flink sink connector, you can send $sink_current_time
> - $event_time metric.
> Then you use  $sink_current_time - $event_time - ($source_current_time -
> $event_time) = $sink_current_time - $source_current_time as the latency of
> end to end。
>
> Oscar Westra van Holthe - Kind <os...@westravanholthe.nl> 于2020年3月30日周一
> 下午5:15写道:
>
>> On Mon, 30 Mar 2020 at 05:08, Lu Niu <qqib...@gmail.com> wrote:
>>
>>> $current_processing - $event_time works for event time. How about
>>> processing time? Is there a good way to measure the latency?
>>>
>>
>> To measure latency you'll need some way to determine the time spent
>> between the start and end of your pipeline.
>>
>> To measure latency when using processing time, you'll need to partially
>> use ingestion time. That is, you'll need to add the 'current' processing
>> time as soon as messages are ingested.
>>
>> With it, you can then use the $current_processing - $ingest_time
>> solution that was already mentioned.
>>
>> Kind regards,
>> Oscar
>>
>> --
>> Oscar Westra van Holthe - Kind
>>
>
>

Reply via email to