KarmaGYZ commented on a change in pull request #11854: URL: https://github.com/apache/flink/pull/11854#discussion_r414248987
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -76,4 +78,43 @@ return externalResourceConfigs; } + + /** + * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s + * are mapped by its resource name. + */ + public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) throws Exception { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> { + externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory); + }); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final String driverFactoryClassName = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, ""); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not found driver class name for {}. Please make sure {}{}{} is configured.", + resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX); + continue; + } + + if (externalResourceFactories.containsKey(driverFactoryClassName)) { + externalResourceDrivers.put(resourceName, externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config)); Review comment: True, it seems not a fatal error to Flink. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org