Hello.

Flink emits watermark metrics (currentWatermark) as a Unix timestamp, which is 
useful in some context but troublesome for others. For instance, when sending 
data to Datadog, there is no way to meaningfully see or act upon this metric, 
because there is no support for timestamps.

A more useful metric would be the delta between the current watermark and the 
wall-clock time.

So I was trying to emit that metric myself from my job, but I'm quite lost. 
This is what I have tried:

1. I used a RichMapFunction expecting to get somehow the current watermark from 
the runtime context. I could not figure out how to get that so I tried hacking 
the metrics to get the watermark out of the metrics group. Something like this:

private fun getOperatorWatermarkGauge(metricName: String): Gauge<Long> {
  return try {
    val metricsField = 
AbstractMetricGroup::class.java.getDeclaredField("metrics")
    metricsField.isAccessible = true
    val metrics: Map<String, Metric> = 
metricsField.get(runtimeContext.metricGroup) as Map<String, Metric>
    metrics[metricName] as Gauge<Long>
  } catch (e: Exception) {
    LOGGER.error("Failed to get input watermark metric. Using no-op one", e)
    Gauge { 0L } // NO-OP gauge
  }
}

My idea was to use the inner gauge to get the current watermark and then emit 
the delta. That didn't work (that gauge does not return sensical values)

2. I tried creating a custom operator based on 
TimestampsAndPeriodicWatermarksOperator, that overloads the processWatermark 
function to get the current watermark. For some reason, that method is not 
called at all.

3. I might try to wrap the datadog reporter to intercept the watermark gauges 
and emit the delta from there.

So before I keep digging into this, I would like more opinions because right 
now it just feels I'm fighting against the API, and it seems to me that there 
should be a way to achieve this in a clean way.

Thanks.

Reply via email to