[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205657#comment-16205657
 ] 

ASF GitHub Bot commented on FLINK-7502:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4586#discussion_r144804293
  
    --- Diff: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ---
    @@ -114,39 +120,78 @@ public void notifyOfAddedMetric(final Metric metric, 
final String metricName, fi
                        
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
                }
     
    -           final String validMetricName = scope + SCOPE_SEPARATOR + 
CHARACTER_FILTER.filterCharacters(metricName);
    -           final String metricIdentifier = 
group.getMetricIdentifier(metricName);
    +           final String scopedMetricName = getScopedName(metricName, 
group);
    +           final String helpString = metricName + " (scope: " + 
getLogicalScope(group) + ")";
    +
                final Collector collector;
    -           if (metric instanceof Gauge) {
    -                   collector = createGauge((Gauge) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    -           } else if (metric instanceof Counter) {
    -                   collector = createGauge((Counter) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    -           } else if (metric instanceof Meter) {
    -                   collector = createGauge((Meter) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    -           } else if (metric instanceof Histogram) {
    -                   collector = createSummary((Histogram) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +           Integer count = 0;
    +
    +           if 
(!collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
    +                   if (metric instanceof Gauge) {
    +                           collector = newGauge(scopedMetricName, 
helpString, dimensionKeys, dimensionValues, gaugeFrom((Gauge) metric));
    +                   } else if (metric instanceof Counter) {
    +                           collector = newGauge(scopedMetricName, 
helpString, dimensionKeys, dimensionValues, gaugeFrom((Counter) metric));
    +                   } else if (metric instanceof Meter) {
    +                           collector = newGauge(scopedMetricName, 
helpString, dimensionKeys, dimensionValues, gaugeFrom((Meter) metric));
    +                   } else if (metric instanceof Histogram) {
    +                           collector = new 
HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, 
dimensionKeys, dimensionValues);
    +                   } else {
    +                           LOG.warn("Cannot add unknown metric type: {}. 
This indicates that the metric type is not supported by this reporter.",
    +                                   metric.getClass().getName());
    +                           return;
    +                   }
    +                   try {
    +                           collector.register();
    +                   } catch (Exception e) {
    +                           LOG.warn("There was a problem registering 
metric {}.", metricName, e);
    +                   }
                } else {
    -                   LOG.warn("Cannot add unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
    -                           metric.getClass().getName());
    -                   return;
    +                   final AbstractMap.SimpleImmutableEntry<Collector, 
Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
    +                   collector = collectorWithCount.getKey();
    +                   count = collectorWithCount.getValue();
    +                   if (metric instanceof Gauge) {
    +                           ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
    +                   } else if (metric instanceof Counter) {
    +                           ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
    +                   } else if (metric instanceof Meter) {
    +                           ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
    +                   } else if (metric instanceof Histogram) {
    +                           ((HistogramSummaryProxy) 
collector).addChild((Histogram) metric, dimensionValues);
    +                   }
                }
    -           collector.register();
    -           collectorsByMetricName.put(metricName, collector);
    +           collectorsWithCountByMetricName.put(scopedMetricName, new 
AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
    +   }
    +
    +   private static String getScopedName(String metricName, MetricGroup 
group) {
    +           return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR 
+ CHARACTER_FILTER.filterCharacters(metricName);
        }
     
        @Override
        public void notifyOfRemovedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
    -           
CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
    -           collectorsByMetricName.remove(metricName);
    +           final String scopedMetricName = getScopedName(metricName, 
group);
    +           final AbstractMap.SimpleImmutableEntry<Collector, Integer> 
collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
    +           final Integer count = collectorWithCount.getValue();
    +           final Collector collector = collectorWithCount.getKey();
    +           if (count == 1) {
    --- End diff --
    
    You need a separate synchronization mechanism here to prevent 
race-conditions between adding/removing metrics. A metric can be swallowed if 
it is added between retrieving the collector and removing it from the map.


> PrometheusReporter improvements
> -------------------------------
>
>                 Key: FLINK-7502
>                 URL: https://issues.apache.org/jira/browse/FLINK-7502
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>    Affects Versions: 1.4.0
>            Reporter: Maximilian Bode
>            Assignee: Maximilian Bode
>            Priority: Minor
>             Fix For: 1.4.0
>
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to