[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426704#comment-15426704 ]
ASF GitHub Bot commented on FLINK-4389: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r75337579 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.dispatch.OnSuccess; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. + * + * Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since + * the last call has passed. + */ +public class MetricFetcher { + private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); + + private final JobManagerRetriever retriever; + private final ExecutionContext ctx; + private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS); + + private MetricStore metrics = new MetricStore(); + + private long lastUpdateTime; + + public MetricFetcher(JobManagerRetriever retriever, ExecutionContext ctx) { + this.retriever = Preconditions.checkNotNull(retriever); + this.ctx = Preconditions.checkNotNull(ctx); + } + + /** + * Returns the MetricStore containing all stored metrics. + * + * @return MetricStore containing all stored metrics; + */ + public MetricStore getMetricStore() { + return metrics; + } + + /** + * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. + */ + public void update() { + synchronized (this) { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update + lastUpdateTime = currentTime; + fetchMetrics(); + } + } + } + + private void fetchMetrics() { + try { + Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); + if (jobManagerGatewayAndWebPort.isDefined()) { + ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + + /** + * Remove all metrics that belong to a job that is not running and no longer archived. + */ + jobManager.ask(new RequestJobDetails(true, true), timeout) + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + MultipleJobsDetails details = (MultipleJobsDetails) result; + ArrayList<String> toRetain = new ArrayList<>(); + for (JobDetails job : details.getRunningJobs()) { + toRetain.add(job.getJobId().toString()); + } + for (JobDetails job : details.getFinishedJobs()) { + toRetain.add(job.getJobId().toString()); + } + synchronized (metrics) { + metrics.jobs.keySet().retainAll(toRetain); + } + } + }, ctx); + + /** + * Requests the metric dump from the job manager. + */ + jobManager.ask(Messages.getRequestMetrics(), timeout) + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + addMetrics(result); + } + }, ctx); + + /** + * We first request the list of all registered task managers from the job manager, and then + * request the respective metric dump from each task manager. + * + * All stored metrics that do not belong to a registered task manager will be removed. + */ + jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout) + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); + List<String> activeTaskManagers = new ArrayList<>(); + for (Instance taskManager : taskManagers) { + activeTaskManagers.add(taskManager.getId().toString()); + taskManager.getActorGateway().ask(Messages.getRequestMetrics(), timeout) + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + addMetrics(result); + } + }, ctx); + } + synchronized (metrics) { // remove all metrics belonging to unregistered task managers + metrics.taskManagers.keySet().retainAll(activeTaskManagers); + } + } + }, ctx); + } + } catch (Exception e) { + LOG.warn("Exception while fetching metrics.", e); + } + } + + private void addMetrics(Object result) { + Object[] data = (Object[]) result; + for (int x = 0; x < data.length; ) { + synchronized (metrics) { + switch ((byte) data[x++]) { + case 0: + case 1: + String name = (String) data[x++]; + Object value = data[x++]; + metrics.add(name, value); + break; + case 2: + String histogramName = (String) data[x++]; + metrics.add(histogramName + "_min", data[x++]); + metrics.add(histogramName + "_max", data[x++]); + metrics.add(histogramName + "_mean", data[x++]); + metrics.add(histogramName + "_median", data[x++]); + metrics.add(histogramName + "_stddev", data[x++]); + metrics.add(histogramName + "_p75", data[x++]); + metrics.add(histogramName + "_p90", data[x++]); + metrics.add(histogramName + "_p95", data[x++]); + metrics.add(histogramName + "_p98", data[x++]); + metrics.add(histogramName + "_p99", data[x++]); + metrics.add(histogramName + "_p999", data[x++]); --- End diff -- This is probably quite error prone, because whenever the histogram is changed (e.g. order of serialized fields) it has to be reflected here as well. I think it would be better if we would have a serializer which does the serialization and deserialization in one place. > Expose metrics to Webfrontend > ----------------------------- > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend > Affects Versions: 1.1.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)