Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527637 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); final Map<String, JobMetricStore> jobs = new HashMap<>(); - /** - * Adds a metric to this MetricStore. - * - * @param name the metric identifier - * @param value the metric value - */ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** - * JobManagerMetricStore metric - * format: 0:<user_scope>.<name> - */ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** - * TaskManager metric - * format: 1:<tm_id>:<user_scope>.<name> - */ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); if (tm == null) { tm = new TaskManagerMetricStore(); - taskManagers.put(components[1], tm); + taskManagers.put(tmID, tm); } - tm.metrics.put(components[2], value); + addMetric(tm.metrics, name, metric); break; - /** - * Job metric - * format: 2:<job_id>:<user_scope>.<name> - */ - case "2": - if (components.length != 3) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(jobInfo.jobID, job); } - job.metrics.put(components[2], value); + addMetric(job.metrics, name, metric); break; - /** - * Task metric - * format: 3:<job_id>:<task_id>:<subtask_index>:<user_scope>.<name> - * - * As the WebInterface task metric queries currently do not account for subtasks we don't - * divide by subtask and instead use the concatenation of subtask index and metric name as the name. - */ - case "3": - if (components.length != 5) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + job = jobs.get(taskInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(taskInfo.jobID, job); } - task = job.tasks.get(components[2]); + task = job.tasks.get(taskInfo.vertexID); if (task == null) { task = new TaskMetricStore(); - job.tasks.put(components[2], task); + job.tasks.put(taskInfo.vertexID, task); } - task.metrics.put(components[3] + "." + components[4], value); + /** + * As the WebInterface task metric queries currently do not account for subtasks we don't + * divide by subtask and instead use the concatenation of subtask index and metric name as the name. + */ + addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); break; - /** - * Operator metric - * format: 4:<job_id>:<task_id>:<subtask_index>:<operator_name>:<user_scope>.<name> - * - * As the WebInterface does not account for operators (because it can't) we don't - * divide by operator and instead use the concatenation of subtask index, operator name and metric name - * as the name. - */ - case "4": - if (components.length != 6) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + job = jobs.get(operatorInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(operatorInfo.jobID, job); } - task = job.tasks.get(components[2]); + task = job.tasks.get(operatorInfo.vertexID); if (task == null) { task = new TaskMetricStore(); - job.tasks.put(components[2], task); + job.tasks.put(operatorInfo.vertexID, task); } - task.metrics.put(components[3] + "." + components[4] + "." + components[5], value); + /** + * As the WebInterface does not account for operators (because it can't) we don't + * divide by operator and instead use the concatenation of subtask index, operator name and metric name + * as the name. + */ + addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); break; default: - LOG.debug("Invalid metric name format: " + name); + LOG.debug("Invalid metric dump category: " + info.getCategory()); } } catch (Exception e) { - LOG.debug("Malformed metric name format: " + name); + LOG.debug("Malformed metric dump.", e); + } + } + + private void addMetric(Map<String, Object> target, String name, MetricDump metric) { + switch (metric.getCategory()) { + case METRIC_CATEGORY_COUNTER: --- End diff -- The same question applies here as well. Why don't we simply check what type the metric instance is?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---