Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2363#discussion_r77527637
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
    @@ -35,109 +46,111 @@
        final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
        final Map<String, JobMetricStore> jobs = new HashMap<>();
     
    -   /**
    -    * Adds a metric to this MetricStore.
    -    *
    -    * @param name  the metric identifier
    -    * @param value the metric value
    -    */
    -   public void add(String name, Object value) {
    -           TaskManagerMetricStore tm;
    -           JobMetricStore job;
    -           TaskMetricStore task;
    -
    +   public void add(MetricDump metric) {
                try {
    -                   String[] components = name.split(":");
    -                   switch (components[0]) {
    -                           /**
    -                            * JobManagerMetricStore metric
    -                            * format: 0:<user_scope>.<name>
    -                            */
    -                           case "0":
    -                                   jobManager.metrics.put(components[1], 
value);
    -                                   break;
    -                           /**
    -                            * TaskManager metric
    -                            * format: 1:<tm_id>:<user_scope>.<name>
    -                            */
    -                           case "1":
    -                                   if (components.length != 3) {
    -                                           break;
    -                                   }
    -                                   tm = taskManagers.get(components[1]);
    +                   QueryScopeInfo info = metric.scopeInfo;
    +                   TaskManagerMetricStore tm;
    +                   JobMetricStore job;
    +                   TaskMetricStore task;
    +
    +                   String name = info.scope.isEmpty()
    +                           ? metric.name
    +                           : info.scope + "." + metric.name;
    +                   
    +                   if (name.isEmpty()) { // malformed transmission
    +                           return;
    +                   }
    +
    +                   switch (info.getCategory()) {
    +                           case INFO_CATEGORY_JM:
    +                                   addMetric(jobManager.metrics, name, 
metric);
    +                           case INFO_CATEGORY_TM:
    +                                   String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
    +                                   tm = taskManagers.get(tmID);
                                        if (tm == null) {
                                                tm = new 
TaskManagerMetricStore();
    -                                           taskManagers.put(components[1], 
tm);
    +                                           taskManagers.put(tmID, tm);
                                        }
    -                                   tm.metrics.put(components[2], value);
    +                                   addMetric(tm.metrics, name, metric);
                                        break;
    -                           /**
    -                            * Job metric
    -                            * format: 2:<job_id>:<user_scope>.<name>
    -                            */
    -                           case "2":
    -                                   if (components.length != 3) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_JOB:
    +                                   QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
    +                                   job = jobs.get(jobInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(jobInfo.jobID, job);
                                        }
    -                                   job.metrics.put(components[2], value);
    +                                   addMetric(job.metrics, name, metric);
                                        break;
    -                           /**
    -                            * Task metric
    -                            * format: 
3:<job_id>:<task_id>:<subtask_index>:<user_scope>.<name>
    -                            *
    -                            * As the WebInterface task metric queries 
currently do not account for subtasks we don't 
    -                            * divide by subtask and instead use the 
concatenation of subtask index and metric name as the name. 
    -                            */
    -                           case "3":
    -                                   if (components.length != 5) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_TASK:
    +                                   QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
    +                                   job = jobs.get(taskInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(taskInfo.jobID, job);
                                        }
    -                                   task = job.tasks.get(components[2]);
    +                                   task = job.tasks.get(taskInfo.vertexID);
                                        if (task == null) {
                                                task = new TaskMetricStore();
    -                                           job.tasks.put(components[2], 
task);
    +                                           
job.tasks.put(taskInfo.vertexID, task);
                                        }
    -                                   task.metrics.put(components[3] + "." + 
components[4], value);
    +                                   /**
    +                                    * As the WebInterface task metric 
queries currently do not account for subtasks we don't 
    +                                    * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name. 
    +                                    */
    +                                   addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
                                        break;
    -                           /**
    -                            * Operator metric
    -                            * format: 
4:<job_id>:<task_id>:<subtask_index>:<operator_name>:<user_scope>.<name>
    -                            *
    -                            * As the WebInterface does not account for 
operators (because it can't) we don't 
    -                            * divide by operator and instead use the 
concatenation of subtask index, operator name and metric name 
    -                            * as the name.
    -                            */
    -                           case "4":
    -                                   if (components.length != 6) {
    -                                           break;
    -                                   }
    -                                   job = jobs.get(components[1]);
    +                           case INFO_CATEGORY_OPERATOR:
    +                                   QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
    +                                   job = jobs.get(operatorInfo.jobID);
                                        if (job == null) {
                                                job = new JobMetricStore();
    -                                           jobs.put(components[1], job);
    +                                           jobs.put(operatorInfo.jobID, 
job);
                                        }
    -                                   task = job.tasks.get(components[2]);
    +                                   task = 
job.tasks.get(operatorInfo.vertexID);
                                        if (task == null) {
                                                task = new TaskMetricStore();
    -                                           job.tasks.put(components[2], 
task);
    +                                           
job.tasks.put(operatorInfo.vertexID, task);
                                        }
    -                                   task.metrics.put(components[3] + "." + 
components[4] + "." + components[5], value);
    +                                   /**
    +                                    * As the WebInterface does not account 
for operators (because it can't) we don't 
    +                                    * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name 
    +                                    * as the name.
    +                                    */
    +                                   addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
                                        break;
                                default:
    -                                   LOG.debug("Invalid metric name format: 
" + name);
    +                                   LOG.debug("Invalid metric dump 
category: " + info.getCategory());
                        }
                } catch (Exception e) {
    -                   LOG.debug("Malformed metric name format: " + name);
    +                   LOG.debug("Malformed metric dump.", e);
    +           }
    +   }
    +
    +   private void addMetric(Map<String, Object> target, String name, 
MetricDump metric) {
    +           switch (metric.getCategory()) {
    +                   case METRIC_CATEGORY_COUNTER:
    --- End diff --
    
    The same question applies here as well. Why don't we simply check what type 
the metric instance is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to