KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r414248987



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+               });
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String driverFactoryClassName = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, "");
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {}{}{} is configured.",
+                                       resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+                               continue;
+                       }
+
+                       if 
(externalResourceFactories.containsKey(driverFactoryClassName)) {
+                               externalResourceDrivers.put(resourceName, 
externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config));

Review comment:
       True, it seems not a fatal error to Flink.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to