KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r414256108



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+               });
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String driverFactoryClassName = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, "");
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {}{}{} is configured.",
+                                       resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+                               continue;
+                       }
+
+                       if 
(externalResourceFactories.containsKey(driverFactoryClassName)) {
+                               externalResourceDrivers.put(resourceName, 
externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config));
+                               LOG.info("Add external resources driver for 
{}.", resourceName);
+                       } else {
+                               LOG.error("Could not find factory class {} for 
{}. The information might not be exposed/reported.", driverFactoryClassName, 
resourceName);

Review comment:
       I think it should be error level because user explicit set the 
`driverFactoryClassName` but we could not find that class from `PluginManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to