Hi, all
I want to know element's latency before write to Elasticsearch, so I 
registering a custom metrics as follow:


class CustomElasticsearchSinkFunction extends 
ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _


  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() 
=> latency))
    }
  }


  def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): 
IndexRequest = {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
    Requests.indexRequest.index("test").`type`("event").source(element.json)
  }


  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}


but that does not seem to work, Does anyone know why?


Regards
wyp

Reply via email to