Hi, all I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:
class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] { private var metricGroup: Option[MetricGroup] = None private var latency: Long = _ private def init(runtimeContext: RuntimeContext): Unit = { if (metricGroup.isEmpty) { metricGroup = Some(runtimeContext.getMetricGroup) metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency)) } } def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = { init(runtimeContext) latency = System.currentTimeMillis() - element.executeTime.getMillis Requests.indexRequest.index("test").`type`("event").source(element.json) } override def process(element: EventEntry, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = requestIndexer.add(createIndexRequest(element, runtimeContext)) } but that does not seem to work, Does anyone know why? Regards wyp