
ASF GitHub Bot commented on FLINK-4389:

Github user zentol commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,185 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.metrics;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.Messages;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
    +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    + * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
    + *
    + * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
    + * the last call has passed.
    + */
    +public class MetricFetcher {
    +   private static final Logger LOG = 
    +   private final JobManagerRetriever retriever;
    +   private final ExecutionContext ctx;
    +   private final FiniteDuration timeout = new 
    +   private MetricStore metrics = new MetricStore();
    +   private long lastUpdateTime;
    +   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
    +           this.retriever = Preconditions.checkNotNull(retriever);
    +           this.ctx = Preconditions.checkNotNull(ctx);
    +   }
    +   /**
    +    * Returns the MetricStore containing all stored metrics.
    +    *
    +    * @return MetricStore containing all stored metrics;
    +    */
    +   public MetricStore getMetricStore() {
    +           return metrics;
    +   }
    +   /**
    +    * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
    +    */
    +   public void update() {
    +           synchronized (this) {
    +                   long currentTime = System.currentTimeMillis();
    +                   if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
    +                           lastUpdateTime = currentTime;
    +                           fetchMetrics();
    +                   }
    +           }
    +   }
    +   private void fetchMetrics() {
    +           try {
    +                   Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    +                   if (jobManagerGatewayAndWebPort.isDefined()) {
    +                           ActorGateway jobManager = 
    +                           /**
    +                            * Remove all metrics that belong to a job that 
is not running and no longer archived.
    +                            */
    +                           jobManager.ask(new RequestJobDetails(true, 
true), timeout)
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   MultipleJobsDetails 
details = (MultipleJobsDetails) result;
    +                                                   ArrayList<String> 
toRetain = new ArrayList<>();
    +                                                   for (JobDetails job : 
details.getRunningJobs()) {
    +                                                   }
    +                                                   for (JobDetails job : 
details.getFinishedJobs()) {
    +                                                   }
    +                                                   synchronized (metrics) {
    +                                                   }
    +                                           }
    +                                   }, ctx);
    +                           /**
    +                            * Requests the metric dump from the job 
    +                            */
    +                           jobManager.ask(Messages.getRequestMetrics(), 
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   addMetrics(result);
    +                                           }
    +                                   }, ctx);
    +                           /**
    +                            * We first request the list of all registered 
task managers from the job manager, and then
    +                            * request the respective metric dump from each 
task manager.
    +                            *
    +                            * All stored metrics that do not belong to a 
registered task manager will be removed.
    +                            */
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout)
    +                                   .onSuccess(new OnSuccess<Object>() {
    +                                           @Override
    +                                           public void onSuccess(Object 
result) throws Throwable {
    +                                                   Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
    +                                                   List<String> 
activeTaskManagers = new ArrayList<>();
    +                                                   for (Instance 
taskManager : taskManagers) {
taskManager.getActorGateway().ask(Messages.getRequestMetrics(), timeout)
.onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) throws Throwable {
    +                                                                   }, ctx);
    +                                                   }
    +                                                   synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
    +                                                   }
    +                                           }
    +                                   }, ctx);
    +                   }
    +           } catch (Exception e) {
    +                   LOG.warn("Exception while fetching metrics.", e);
    +           }
    +   }
    +   private void addMetrics(Object result) {
    +           Object[] data = (Object[]) result;
    +           for (int x = 0; x < data.length; ) {
    +                   synchronized (metrics) {
    +                           switch ((byte) data[x++]) {
    +                                   case 0:
    +                                   case 1:
    --- End diff --
    we could use the constants defined in the MetricQueryService, or convert 
those to an enum.

> Expose metrics to Webfrontend
> -----------------------------
>                 Key: FLINK-4389
>                 URL: https://issues.apache.org/jira/browse/FLINK-4389
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics, Webfrontend
>    Affects Versions: 1.1.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: pre-apache
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface

This message was sent by Atlassian JIRA

Reply via email to