[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426704#comment-15426704
 ] 

ASF GitHub Bot commented on FLINK-4389:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2363#discussion_r75337579
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.metrics;
    +
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.Messages;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
    +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
    + *
    + * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
    + * the last call has passed.
    + */
    +public class MetricFetcher {
    +   private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
    +
    +   private final JobManagerRetriever retriever;
    +   private final ExecutionContext ctx;
    +   private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
    +
    +   private MetricStore metrics = new MetricStore();
    +
    +   private long lastUpdateTime;
    +
    +   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
    +           this.retriever = Preconditions.checkNotNull(retriever);
    +           this.ctx = Preconditions.checkNotNull(ctx);
    +   }
    +
    +   /**
    +    * Returns the MetricStore containing all stored metrics.
    +    *
    +    * @return MetricStore containing all stored metrics;
    +    */
    +   public MetricStore getMetricStore() {
    +           return metrics;
    +   }
    +
    +   /**
    +    * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
    +    */
    +   public void update() {
    +           synchronized (this) {
    +                   long currentTime = System.currentTimeMillis();
    +                   if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
    +                           lastUpdateTime = currentTime;
    +                           fetchMetrics();
    +                   }
    +           }
    +   }
    +
    +   private void fetchMetrics() {
    +           try {
    +                   Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    +                   if (jobManagerGatewayAndWebPort.isDefined()) {
    +                           ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
    +
    +                           /**
    +                            * Remove all metrics that belong to a job that 
is not running and no longer archived.
    +                            */
    +                           jobManager.ask(new RequestJobDetails(true, 
true), timeout)
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   MultipleJobsDetails 
details = (MultipleJobsDetails) result;
    +                                                   ArrayList<String> 
toRetain = new ArrayList<>();
    +                                                   for (JobDetails job : 
details.getRunningJobs()) {
    +                                                           
toRetain.add(job.getJobId().toString());
    +                                                   }
    +                                                   for (JobDetails job : 
details.getFinishedJobs()) {
    +                                                           
toRetain.add(job.getJobId().toString());
    +                                                   }
    +                                                   synchronized (metrics) {
    +                                                           
metrics.jobs.keySet().retainAll(toRetain);
    +                                                   }
    +                                           }
    +                                   }, ctx);
    +
    +                           /**
    +                            * Requests the metric dump from the job 
manager.
    +                            */
    +                           jobManager.ask(Messages.getRequestMetrics(), 
timeout)
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   addMetrics(result);
    +                                           }
    +                                   }, ctx);
    +
    +                           /**
    +                            * We first request the list of all registered 
task managers from the job manager, and then
    +                            * request the respective metric dump from each 
task manager.
    +                            *
    +                            * All stored metrics that do not belong to a 
registered task manager will be removed.
    +                            */
    +                           
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout)
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
    +                                                   List<String> 
activeTaskManagers = new ArrayList<>();
    +                                                   for (Instance 
taskManager : taskManagers) {
    +                                                           
activeTaskManagers.add(taskManager.getId().toString());
    +                                                           
taskManager.getActorGateway().ask(Messages.getRequestMetrics(), timeout)
    +                                                                   
.onSuccess(new OnSuccess<Object>() {
    +                                                                           
@Override
    +                                                                           
public void onSuccess(Object result) throws Throwable {
    +                                                                           
        addMetrics(result);
    +                                                                           
}
    +                                                                   }, ctx);
    +                                                   }
    +                                                   synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
    +                                                           
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
    +                                                   }
    +                                           }
    +                                   }, ctx);
    +                   }
    +           } catch (Exception e) {
    +                   LOG.warn("Exception while fetching metrics.", e);
    +           }
    +   }
    +
    +   private void addMetrics(Object result) {
    +           Object[] data = (Object[]) result;
    +           for (int x = 0; x < data.length; ) {
    +                   synchronized (metrics) {
    +                           switch ((byte) data[x++]) {
    +                                   case 0:
    +                                   case 1:
    +                                           String name = (String) 
data[x++];
    +                                           Object value = data[x++];
    +                                           metrics.add(name, value);
    +                                           break;
    +                                   case 2:
    +                                           String histogramName = (String) 
data[x++];
    +                                           metrics.add(histogramName + 
"_min", data[x++]);
    +                                           metrics.add(histogramName + 
"_max", data[x++]);
    +                                           metrics.add(histogramName + 
"_mean", data[x++]);
    +                                           metrics.add(histogramName + 
"_median", data[x++]);
    +                                           metrics.add(histogramName + 
"_stddev", data[x++]);
    +                                           metrics.add(histogramName + 
"_p75", data[x++]);
    +                                           metrics.add(histogramName + 
"_p90", data[x++]);
    +                                           metrics.add(histogramName + 
"_p95", data[x++]);
    +                                           metrics.add(histogramName + 
"_p98", data[x++]);
    +                                           metrics.add(histogramName + 
"_p99", data[x++]);
    +                                           metrics.add(histogramName + 
"_p999", data[x++]);
    --- End diff --
    
    This is probably quite error prone, because whenever the histogram is 
changed (e.g. order of serialized fields) it has to be reflected here as well. 
I think it would be better if we would have a serializer which does the 
serialization and deserialization in one place.


> Expose metrics to Webfrontend
> -----------------------------
>
>                 Key: FLINK-4389
>                 URL: https://issues.apache.org/jira/browse/FLINK-4389
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics, Webfrontend
>    Affects Versions: 1.1.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to