afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393570847
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ########## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map<String, MetricReporterFactory> loadReporterFactories() { - final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List<Tuple2<String, Configuration>> loadReporterConfigurations(Configuration configuration, Set<String> namedReporters) { + final List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set<String> findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set<String> namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map<String, MetricReporterFactory> loadAvailableReporterFactories(PluginManager pluginManager) { final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2); - final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator(); + final Iterator<MetricReporterFactory> factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.warn(new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() + .toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() Review comment: @AHeise the jar path for existing metrics reporters will currently point to the same file due to parent-first loading of org.apache.flink packages (even if one of the jars is in /plugin directory). It might be misleading, so I wanted to keep the note for improvement for later, for when the loading is done differently. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services