afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r397030268
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ########## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map<String, MetricReporterFactory> loadReporterFactories() { - final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List<Tuple2<String, Configuration>> loadReporterConfigurations(Configuration configuration, Set<String> namedReporters) { + final List<Tuple2<String, Configuration>> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set<String> findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: You've mentioned "to get rid of the next few lines", but this Tree data structure is further used as a container that is filled with some conditional logic and returned, it is not just about having the input entries sorted. Do you propose to rewrite it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services