[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205657#comment-16205657 ]
ASF GitHub Bot commented on FLINK-7502: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4586#discussion_r144804293 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java --- @@ -114,39 +120,78 @@ public void notifyOfAddedMetric(final Metric metric, final String metricName, fi dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); } - final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); - final String metricIdentifier = group.getMetricIdentifier(metricName); + final String scopedMetricName = getScopedName(metricName, group); + final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; + final Collector collector; - if (metric instanceof Gauge) { - collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); - } else if (metric instanceof Counter) { - collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); - } else if (metric instanceof Meter) { - collector = createGauge((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); - } else if (metric instanceof Histogram) { - collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); + Integer count = 0; + + if (!collectorsWithCountByMetricName.containsKey(scopedMetricName)) { + if (metric instanceof Gauge) { + collector = newGauge(scopedMetricName, helpString, dimensionKeys, dimensionValues, gaugeFrom((Gauge) metric)); + } else if (metric instanceof Counter) { + collector = newGauge(scopedMetricName, helpString, dimensionKeys, dimensionValues, gaugeFrom((Counter) metric)); + } else if (metric instanceof Meter) { + collector = newGauge(scopedMetricName, helpString, dimensionKeys, dimensionValues, gaugeFrom((Meter) metric)); + } else if (metric instanceof Histogram) { + collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); + } else { + LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + return; + } + try { + collector.register(); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}.", metricName, e); + } } else { - LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - return; + final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + collector = collectorWithCount.getKey(); + count = collectorWithCount.getValue(); + if (metric instanceof Gauge) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); + } else if (metric instanceof Counter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); + } else if (metric instanceof Meter) { + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); + } else if (metric instanceof Histogram) { + ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); + } } - collector.register(); - collectorsByMetricName.put(metricName, collector); + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); + } + + private static String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); } @Override public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { - CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName)); - collectorsByMetricName.remove(metricName); + final String scopedMetricName = getScopedName(metricName, group); + final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + final Integer count = collectorWithCount.getValue(); + final Collector collector = collectorWithCount.getKey(); + if (count == 1) { --- End diff -- You need a separate synchronization mechanism here to prevent race-conditions between adding/removing metrics. A metric can be swallowed if it is added between retrieving the collector and removing it from the map. > PrometheusReporter improvements > ------------------------------- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics > Affects Versions: 1.4.0 > Reporter: Maximilian Bode > Assignee: Maximilian Bode > Priority: Minor > Fix For: 1.4.0 > > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)