[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15465103#comment-15465103
 ] 

ASF GitHub Bot commented on FLINK-4389:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2363#discussion_r77527637
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
    @@ -35,109 +46,111 @@
        final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
        final Map<String, JobMetricStore> jobs = new HashMap<>();
     
    -   /**
    -    * Adds a metric to this MetricStore.
    -    *
    -    * @param name  the metric identifier
    -    * @param value the metric value
    -    */
    -   public void add(String name, Object value) {
    -           TaskManagerMetricStore tm;
    -           JobMetricStore job;
    -           TaskMetricStore task;
    -
    +   public void add(MetricDump metric) {
                try {
    -                   String[] components = name.split(":");
    -                   switch (components[0]) {
    -                           /**
    -                            * JobManagerMetricStore metric
    -                            * format: 0:<user_scope>.<name>
    -                            */
    -                           case "0":
    -                                   jobManager.metrics.put(components[1], 
value);
    -                                   break;
    -                           /**
    -                            * TaskManager metric
    -                            * format: 1:<tm_id>:<user_scope>.<name>
    -                            */
    -                           case "1":
    -                                   if (components.length != 3) {
    -                                           break;
    -                                   }
    -                                   tm = taskManagers.get(components[1]);
    +                   QueryScopeInfo info = metric.scopeInfo;
    +                   TaskManagerMetricStore tm;
    +                   JobMetricStore job;
    +                   TaskMetricStore task;
    +
    +                   String name = info.scope.isEmpty()
    +                           ? metric.name
    +                           : info.scope + "." + metric.name;
    +                   
    +                   if (name.isEmpty()) { // malformed transmission
    +                           return;
    +                   }
    +
    +                   switch (info.getCategory()) {
    +                           case INFO_CATEGORY_JM:
    +                                   addMetric(jobManager.metrics, name, 
metric);
    +                           case INFO_CATEGORY_TM:
    +                                   String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
    +                                   tm = taskManagers.get(tmID);
                                        if (tm == null) {
                                                tm = new 
TaskManagerMetricStore();
    -                                           taskManagers.put(components[1], 
tm);
    +                                           taskManagers.put(tmID, tm);
                                        }
    -                                   tm.metrics.put(components[2], value);
    +                                   addMetric(tm.metrics, name, metric);
                                        break;
    -                           /**
    -                            * Job metric
    -                            * format: 2:<job_id>:<user_scope>.<name>
    -                            */
    -                           case "2":
    -                                   if (components.length != 3) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_JOB:
    +                                   QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
    +                                   job = jobs.get(jobInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(jobInfo.jobID, job);
                                        }
    -                                   job.metrics.put(components[2], value);
    +                                   addMetric(job.metrics, name, metric);
                                        break;
    -                           /**
    -                            * Task metric
    -                            * format: 
3:<job_id>:<task_id>:<subtask_index>:<user_scope>.<name>
    -                            *
    -                            * As the WebInterface task metric queries 
currently do not account for subtasks we don't 
    -                            * divide by subtask and instead use the 
concatenation of subtask index and metric name as the name. 
    -                            */
    -                           case "3":
    -                                   if (components.length != 5) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_TASK:
    +                                   QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
    +                                   job = jobs.get(taskInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(taskInfo.jobID, job);
                                        }
    -                                   task = job.tasks.get(components[2]);
    +                                   task = job.tasks.get(taskInfo.vertexID);
                                        if (task == null) {
                                                task = new TaskMetricStore();
    -                                           job.tasks.put(components[2], 
task);
    +                                           
job.tasks.put(taskInfo.vertexID, task);
                                        }
    -                                   task.metrics.put(components[3] + "." + 
components[4], value);
    +                                   /**
    +                                    * As the WebInterface task metric 
queries currently do not account for subtasks we don't 
    +                                    * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name. 
    +                                    */
    +                                   addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
                                        break;
    -                           /**
    -                            * Operator metric
    -                            * format: 
4:<job_id>:<task_id>:<subtask_index>:<operator_name>:<user_scope>.<name>
    -                            *
    -                            * As the WebInterface does not account for 
operators (because it can't) we don't 
    -                            * divide by operator and instead use the 
concatenation of subtask index, operator name and metric name 
    -                            * as the name.
    -                            */
    -                           case "4":
    -                                   if (components.length != 6) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_OPERATOR:
    +                                   QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
    +                                   job = jobs.get(operatorInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(operatorInfo.jobID, 
job);
                                        }
    -                                   task = job.tasks.get(components[2]);
    +                                   task = 
job.tasks.get(operatorInfo.vertexID);
                                        if (task == null) {
                                                task = new TaskMetricStore();
    -                                           job.tasks.put(components[2], 
task);
    +                                           
job.tasks.put(operatorInfo.vertexID, task);
                                        }
    -                                   task.metrics.put(components[3] + "." + 
components[4] + "." + components[5], value);
    +                                   /**
    +                                    * As the WebInterface does not account 
for operators (because it can't) we don't 
    +                                    * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name 
    +                                    * as the name.
    +                                    */
    +                                   addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
                                        break;
                                default:
    -                                   LOG.debug("Invalid metric name format: 
" + name);
    +                                   LOG.debug("Invalid metric dump 
category: " + info.getCategory());
                        }
                } catch (Exception e) {
    -                   LOG.debug("Malformed metric name format: " + name);
    +                   LOG.debug("Malformed metric dump.", e);
    +           }
    +   }
    +
    +   private void addMetric(Map<String, Object> target, String name, 
MetricDump metric) {
    +           switch (metric.getCategory()) {
    +                   case METRIC_CATEGORY_COUNTER:
    --- End diff --
    
    The same question applies here as well. Why don't we simply check what type 
the metric instance is?


> Expose metrics to Webfrontend
> -----------------------------
>
>                 Key: FLINK-4389
>                 URL: https://issues.apache.org/jira/browse/FLINK-4389
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics, Webfrontend
>    Affects Versions: 1.1.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to