[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15465103#comment-15465103 ]
ASF GitHub Bot commented on FLINK-4389: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527637 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); final Map<String, JobMetricStore> jobs = new HashMap<>(); - /** - * Adds a metric to this MetricStore. - * - * @param name the metric identifier - * @param value the metric value - */ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** - * JobManagerMetricStore metric - * format: 0:<user_scope>.<name> - */ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** - * TaskManager metric - * format: 1:<tm_id>:<user_scope>.<name> - */ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); if (tm == null) { tm = new TaskManagerMetricStore(); - taskManagers.put(components[1], tm); + taskManagers.put(tmID, tm); } - tm.metrics.put(components[2], value); + addMetric(tm.metrics, name, metric); break; - /** - * Job metric - * format: 2:<job_id>:<user_scope>.<name> - */ - case "2": - if (components.length != 3) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(jobInfo.jobID, job); } - job.metrics.put(components[2], value); + addMetric(job.metrics, name, metric); break; - /** - * Task metric - * format: 3:<job_id>:<task_id>:<subtask_index>:<user_scope>.<name> - * - * As the WebInterface task metric queries currently do not account for subtasks we don't - * divide by subtask and instead use the concatenation of subtask index and metric name as the name. - */ - case "3": - if (components.length != 5) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + job = jobs.get(taskInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(taskInfo.jobID, job); } - task = job.tasks.get(components[2]); + task = job.tasks.get(taskInfo.vertexID); if (task == null) { task = new TaskMetricStore(); - job.tasks.put(components[2], task); + job.tasks.put(taskInfo.vertexID, task); } - task.metrics.put(components[3] + "." + components[4], value); + /** + * As the WebInterface task metric queries currently do not account for subtasks we don't + * divide by subtask and instead use the concatenation of subtask index and metric name as the name. + */ + addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); break; - /** - * Operator metric - * format: 4:<job_id>:<task_id>:<subtask_index>:<operator_name>:<user_scope>.<name> - * - * As the WebInterface does not account for operators (because it can't) we don't - * divide by operator and instead use the concatenation of subtask index, operator name and metric name - * as the name. - */ - case "4": - if (components.length != 6) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + job = jobs.get(operatorInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(operatorInfo.jobID, job); } - task = job.tasks.get(components[2]); + task = job.tasks.get(operatorInfo.vertexID); if (task == null) { task = new TaskMetricStore(); - job.tasks.put(components[2], task); + job.tasks.put(operatorInfo.vertexID, task); } - task.metrics.put(components[3] + "." + components[4] + "." + components[5], value); + /** + * As the WebInterface does not account for operators (because it can't) we don't + * divide by operator and instead use the concatenation of subtask index, operator name and metric name + * as the name. + */ + addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); break; default: - LOG.debug("Invalid metric name format: " + name); + LOG.debug("Invalid metric dump category: " + info.getCategory()); } } catch (Exception e) { - LOG.debug("Malformed metric name format: " + name); + LOG.debug("Malformed metric dump.", e); + } + } + + private void addMetric(Map<String, Object> target, String name, MetricDump metric) { + switch (metric.getCategory()) { + case METRIC_CATEGORY_COUNTER: --- End diff -- The same question applies here as well. Why don't we simply check what type the metric instance is? > Expose metrics to Webfrontend > ----------------------------- > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend > Affects Versions: 1.1.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)