Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155620220 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java --- @@ -108,15 +118,36 @@ public static MetricRegistryConfiguration fromConfiguration(Configuration config delim = '.'; } - final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST); + Set<String> includedReporters = reporterListPattern.splitAsStream(configuration.getString(MetricOptions.REPORTERS_LIST, "")) + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set<String> namedReporters = new TreeSet<>(String::compareTo); + // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}.", reporterName); --- End diff -- Log the reason for excluding the reporter (not in the reporters list)?
---