xintongsong commented on a change in pull request #11854: URL: https://github.com/apache/flink/pull/11854#discussion_r413464851
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -76,4 +78,43 @@ return externalResourceConfigs; } + + /** + * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s + * are mapped by its resource name. + */ + public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) throws Exception { + final Set<String> resourceSet = getExternalResourceList(config); Review comment: It's wired the method `getXXXList` does not return a list but a set instead. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -76,4 +78,43 @@ return externalResourceConfigs; } + + /** + * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s + * are mapped by its resource name. + */ + public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) throws Exception { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> { + externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory); + }); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final String driverFactoryClassName = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, ""); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not found driver class name for {}. Please make sure {}{}{} is configured.", + resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX); + continue; + } + + if (externalResourceFactories.containsKey(driverFactoryClassName)) { + externalResourceDrivers.put(resourceName, externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config)); + LOG.info("Add external resources driver for {}.", resourceName); + } else { + LOG.error("Could not find factory class {} for {}. The information might not be exposed/reported.", driverFactoryClassName, resourceName); Review comment: > The information might not be exposed/reported. Not sure what does this message mean. Also, not sure about the error log level. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -76,4 +78,43 @@ return externalResourceConfigs; } + + /** + * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s + * are mapped by its resource name. + */ + public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) throws Exception { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> { + externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory); + }); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final String driverFactoryClassName = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, ""); Review comment: Deprecated `config.getString(String, String)` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -76,4 +78,43 @@ return externalResourceConfigs; } + + /** + * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s + * are mapped by its resource name. + */ + public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) throws Exception { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> { + externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory); + }); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final String driverFactoryClassName = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, ""); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not found driver class name for {}. Please make sure {}{}{} is configured.", + resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX); + continue; + } + + if (externalResourceFactories.containsKey(driverFactoryClassName)) { + externalResourceDrivers.put(resourceName, externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config)); Review comment: `ExternalResourceDriverFactory#createExternalResourceDriver` may throw exception. This exception should be handled inside the loop. An exception thrown from creating one external resource driver should not prevent creation of all the other drivers. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() { assertTrue(configMap.containsKey(resourceConfigKey)); assertThat(configMap.get(resourceConfigKey), is(resourceAmount)); } + + @Test + public void testConstructExternalResourceDriversFromConfig() throws Exception { + final Configuration config = new Configuration(); + final String resourceName = "test"; + final String driverClassName = "org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory"; + final PluginManager mockPluginManager = Mockito.mock(PluginManager.class); + Mockito.when(mockPluginManager.load(ExternalResourceDriverFactory.class)).thenReturn(IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); Review comment: Mockito could be avoid by introducing a `TestingPluginManager`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java ########## @@ -87,6 +94,11 @@ public boolean hasBroadcastVariable(String name) { throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set."); } } + + @Override + public <T extends ExternalResourceInfo> Set<T> getExternalResourceInfos(String resourceName, Class<T> externalResourceType) { + return (Set<T>) Collections.unmodifiableSet(externalResources.get(resourceName)); Review comment: Same here. We should add type checks for the external resource infos. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() { assertTrue(configMap.containsKey(resourceConfigKey)); assertThat(configMap.get(resourceConfigKey), is(resourceAmount)); } + + @Test + public void testConstructExternalResourceDriversFromConfig() throws Exception { + final Configuration config = new Configuration(); + final String resourceName = "test"; + final String driverClassName = "org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory"; Review comment: ```suggestion final String driverFactoryClassName = "org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory"; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -117,4 +138,17 @@ return externalResourceDrivers; } + + /** + * Get the external resource information from environment. Index by the resourceName. + */ + public static Map<String, Set<? extends ExternalResourceInfo>> getExternalResourceInfo(Map<String, ExternalResourceDriver> externalResourceDrivers, Configuration configuration) { + final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(configuration); + final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>(); + for (Map.Entry<String, ExternalResourceDriver> externalResourceDriver : externalResourceDrivers.entrySet()) { + final Set<? extends ExternalResourceInfo> externalResourceInfos = externalResourceDriver.getValue().retrieveResourceInfo(externalResourceAmountMap.get(externalResourceDriver.getKey())); Review comment: For better readability: ``` final String resourceName = entry.getKey(); final ExternalResourceDriver = entry.getValue(); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() { assertTrue(configMap.containsKey(resourceConfigKey)); assertThat(configMap.get(resourceConfigKey), is(resourceAmount)); } + + @Test + public void testConstructExternalResourceDriversFromConfig() throws Exception { Review comment: I think we also need some negative test cases, e.g.: - factory class is not configured - plugin cannot be found - user provided factory failed to create the driver ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -117,4 +138,17 @@ return externalResourceDrivers; } + + /** + * Get the external resource information from environment. Index by the resourceName. + */ + public static Map<String, Set<? extends ExternalResourceInfo>> getExternalResourceInfo(Map<String, ExternalResourceDriver> externalResourceDrivers, Configuration configuration) { + final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(configuration); + final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>(); + for (Map.Entry<String, ExternalResourceDriver> externalResourceDriver : externalResourceDrivers.entrySet()) { + final Set<? extends ExternalResourceInfo> externalResourceInfos = externalResourceDriver.getValue().retrieveResourceInfo(externalResourceAmountMap.get(externalResourceDriver.getKey())); Review comment: What if for the given driver, there's no corresponding amount? The current implementation may lead to NPE if `externalResourceAmountMap#get` returns `null`. ########## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ########## @@ -175,6 +177,11 @@ public DistributedCache getDistributedCache() { return this.distributedCache; } + @Override + public <T extends ExternalResourceInfo> Set<T> getExternalResourceInfos(String resourceName, Class<T> externalResourceType) { + throw new UnsupportedOperationException("Do not support external resource in current environment"); + } + Review comment: I'm not familiar with the `RuntimeContext`. Could you explain why some of the subclasses support this API while others do not? To be specific, why is `RuntimeUDFContext` and its subclass `IterationRuntimeUDFContext` not supporting this API? Moreover, I think throwing an `UnsupportedOperationException` is not a common behavior of all the subclasses, thus I would not add this implementation in this abstract base class. The benefit is to force the concrete subclasses to explicitly decide whether or not to support this API. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -117,4 +138,17 @@ return externalResourceDrivers; } + + /** + * Get the external resource information from environment. Index by the resourceName. + */ + public static Map<String, Set<? extends ExternalResourceInfo>> getExternalResourceInfo(Map<String, ExternalResourceDriver> externalResourceDrivers, Configuration configuration) { Review comment: It seems for all the callers of this method, the two arguments are both derived from an `Environment`. Can this utils function just take an `Environment` as argument? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -85,4 +88,20 @@ public void testConstructExternalResourceDriversFromConfig() throws Exception { assertThat(externalResourceDrivers.size(), is(1)); assertTrue(externalResourceDrivers.get(resourceName) instanceof TestingExternalResourceDriver); } + + @Test + public void testGetExternalResourceInfoFromEnvironment() { Review comment: It seems to me this test case does not cover getting info from environment. It might be if we change `getExternalResourceInfo` to accept `Environment` as an argument. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ########## @@ -140,6 +147,11 @@ public String getOperatorUniqueID() { return operatorUniqueID; } + @Override + public <T extends ExternalResourceInfo> Set<T> getExternalResourceInfos(String resourceName, Class<T> externalResourceType) { + return (Set<T>) Collections.unmodifiableSet(externalResources.get(resourceName)); Review comment: I think we should check the type of the external resource infos here. Isn't this the purpose for passing in `externalResourceType`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ########## @@ -78,7 +82,8 @@ public StreamingRuntimeContext( operator.getMetricGroup(), operator.getOperatorID(), operator.getProcessingTimeService(), - operator.getKeyedStateStore()); + operator.getKeyedStateStore(), + Collections.emptyMap()); Review comment: Can we derive the external resources from `env`, instead of just passing in an empty map here? ########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java ########## @@ -198,6 +199,11 @@ public GlobalAggregateManager getGlobalAggregateManager() { throw new UnsupportedOperationException(ERROR_MSG); } + @Override + public Map<String, ExternalResourceDriver> getExternalResourceDrivers() { + return Collections.emptyMap(); + } Review comment: Should we throw an `UnsupportedOperationException` here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -79,6 +80,26 @@ return externalResourceConfigs; } + /** + * Get the map of resource name and amount of all of enabled external resources. + */ + private static Map<String, Long> getExternalResourceAmountMap(Configuration config) { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceAmountMap = new HashMap<>(); + for (String resourceName: resourceSet) { + final long amount = config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0); + externalResourceAmountMap.put(resourceName, amount); + } Review comment: Same here. - Deprecated method used. - Existence check (warning on absence) - Sanity check. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java ########## @@ -41,10 +44,14 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>(); - + + private final Map<String, Set<? extends ExternalResourceInfo>> externalResources; + public DistributedRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, - Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators, MetricGroup metrics) { + Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators, + MetricGroup metrics, Map<String, Set<? extends ExternalResourceInfo>> externalResources) { super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics); + this.externalResources = externalResources; Review comment: If you look at the callers of this constructor, it seems that most of the arguments are derived from an `Environment`. Maybe we can just pass in an `Environment` to reduce the amount of arguments. Alternatively, if we do not want to expose the entire `Environment` to this class, we can consider a builder/factory that takes an `Environment` and generates the constructor arguments. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java ########## @@ -33,7 +33,7 @@ public TestingExternalResourceDriver(Configuration config) { } @Override - public Set<ExternalResourceInfo> retrieveResourceInfo(long amount) { + public Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) { Review comment: Shouldn't this change belong to the previous commit? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org