xintongsong commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r413464851



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);

Review comment:
       It's wired the method `getXXXList` does not return a list but a set 
instead.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+               });
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String driverFactoryClassName = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, "");
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {}{}{} is configured.",
+                                       resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+                               continue;
+                       }
+
+                       if 
(externalResourceFactories.containsKey(driverFactoryClassName)) {
+                               externalResourceDrivers.put(resourceName, 
externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config));
+                               LOG.info("Add external resources driver for 
{}.", resourceName);
+                       } else {
+                               LOG.error("Could not find factory class {} for 
{}. The information might not be exposed/reported.", driverFactoryClassName, 
resourceName);

Review comment:
       > The information might not be exposed/reported.
   
   Not sure what does this message mean.
   
   Also, not sure about the error log level.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+               });
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String driverFactoryClassName = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, "");

Review comment:
       Deprecated `config.getString(String, String)`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -76,4 +78,43 @@
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+               });
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String driverFactoryClassName = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX, "");
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {}{}{} is configured.",
+                                       resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX, resourceName, 
ExternalResourceConstants.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+                               continue;
+                       }
+
+                       if 
(externalResourceFactories.containsKey(driverFactoryClassName)) {
+                               externalResourceDrivers.put(resourceName, 
externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(config));

Review comment:
       `ExternalResourceDriverFactory#createExternalResourceDriver` may throw 
exception. This exception should be handled inside the loop. An exception 
thrown from creating one external resource driver should not prevent creation 
of all the other drivers.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() {
                assertTrue(configMap.containsKey(resourceConfigKey));
                assertThat(configMap.get(resourceConfigKey), 
is(resourceAmount));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() throws 
Exception {
+               final Configuration config = new Configuration();
+               final String resourceName = "test";
+               final String driverClassName = 
"org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory";
+               final PluginManager mockPluginManager = 
Mockito.mock(PluginManager.class);
+               
Mockito.when(mockPluginManager.load(ExternalResourceDriverFactory.class)).thenReturn(IteratorUtils.singletonIterator(new
 TestingExternalResourceDriverFactory()));

Review comment:
       Mockito could be avoid by introducing a `TestingPluginManager`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
##########
@@ -87,6 +94,11 @@ public boolean hasBroadcastVariable(String name) {
                        throw new IllegalArgumentException("The broadcast 
variable with name '" + name + "' has not been set.");
                }
        }
+
+       @Override
+       public <T extends ExternalResourceInfo> Set<T> 
getExternalResourceInfos(String resourceName, Class<T> externalResourceType) {
+               return (Set<T>) 
Collections.unmodifiableSet(externalResources.get(resourceName));

Review comment:
       Same here. We should add type checks for the external resource infos.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() {
                assertTrue(configMap.containsKey(resourceConfigKey));
                assertThat(configMap.get(resourceConfigKey), 
is(resourceAmount));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() throws 
Exception {
+               final Configuration config = new Configuration();
+               final String resourceName = "test";
+               final String driverClassName = 
"org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory";

Review comment:
       ```suggestion
                final String driverFactoryClassName = 
"org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory";
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +138,17 @@
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from environment. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               final Map<String, Long> externalResourceAmountMap = 
ExternalResourceUtils.getExternalResourceAmountMap(configuration);
+               final Map<String, Set<? extends ExternalResourceInfo>> 
externalResources = new HashMap<>();
+               for (Map.Entry<String, ExternalResourceDriver> 
externalResourceDriver : externalResourceDrivers.entrySet()) {
+                       final Set<? extends ExternalResourceInfo> 
externalResourceInfos = 
externalResourceDriver.getValue().retrieveResourceInfo(externalResourceAmountMap.get(externalResourceDriver.getKey()));

Review comment:
       For better readability:
   ```
   final String resourceName = entry.getKey();
   final ExternalResourceDriver = entry.getValue();
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -65,4 +68,21 @@ public void testGetExternalResourceConfigMap() {
                assertTrue(configMap.containsKey(resourceConfigKey));
                assertThat(configMap.get(resourceConfigKey), 
is(resourceAmount));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() throws 
Exception {

Review comment:
       I think we also need some negative test cases, e.g.:
   - factory class is not configured
   - plugin cannot be found
   - user provided factory failed to create the driver

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +138,17 @@
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from environment. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               final Map<String, Long> externalResourceAmountMap = 
ExternalResourceUtils.getExternalResourceAmountMap(configuration);
+               final Map<String, Set<? extends ExternalResourceInfo>> 
externalResources = new HashMap<>();
+               for (Map.Entry<String, ExternalResourceDriver> 
externalResourceDriver : externalResourceDrivers.entrySet()) {
+                       final Set<? extends ExternalResourceInfo> 
externalResourceInfos = 
externalResourceDriver.getValue().retrieveResourceInfo(externalResourceAmountMap.get(externalResourceDriver.getKey()));

Review comment:
       What if for the given driver, there's no corresponding amount?
   The current implementation may lead to NPE if 
`externalResourceAmountMap#get` returns `null`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
##########
@@ -175,6 +177,11 @@ public DistributedCache getDistributedCache() {
                return this.distributedCache;
        }
 
+       @Override
+       public <T extends ExternalResourceInfo> Set<T> 
getExternalResourceInfos(String resourceName, Class<T> externalResourceType) {
+               throw new UnsupportedOperationException("Do not support 
external resource in current environment");
+       }
+

Review comment:
       I'm not familiar with the `RuntimeContext`. Could you explain why some 
of the subclasses support this API while others do not? To be specific, why is 
`RuntimeUDFContext` and its subclass `IterationRuntimeUDFContext` not 
supporting this API?
   
   Moreover, I think throwing an `UnsupportedOperationException` is not a 
common behavior of all the subclasses, thus I would not add this implementation 
in this abstract base class. The benefit is to force the concrete subclasses to 
explicitly decide whether or not to support this API.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +138,17 @@
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from environment. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {

Review comment:
       It seems for all the callers of this method, the two arguments are both 
derived from an `Environment`.
   Can this utils function just take an `Environment` as argument?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -85,4 +88,20 @@ public void testConstructExternalResourceDriversFromConfig() 
throws Exception {
                assertThat(externalResourceDrivers.size(), is(1));
                assertTrue(externalResourceDrivers.get(resourceName) instanceof 
TestingExternalResourceDriver);
        }
+
+       @Test
+       public void testGetExternalResourceInfoFromEnvironment() {

Review comment:
       It seems to me this test case does not cover getting info from 
environment.
   It might be if we change `getExternalResourceInfo` to accept `Environment` 
as an argument.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
##########
@@ -140,6 +147,11 @@ public String getOperatorUniqueID() {
                return operatorUniqueID;
        }
 
+       @Override
+       public <T extends ExternalResourceInfo> Set<T> 
getExternalResourceInfos(String resourceName, Class<T> externalResourceType) {
+               return (Set<T>) 
Collections.unmodifiableSet(externalResources.get(resourceName));

Review comment:
       I think we should check the type of the external resource infos here. 
Isn't this the purpose for passing in `externalResourceType`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
##########
@@ -78,7 +82,8 @@ public StreamingRuntimeContext(
                        operator.getMetricGroup(),
                        operator.getOperatorID(),
                        operator.getProcessingTimeService(),
-                       operator.getKeyedStateStore());
+                       operator.getKeyedStateStore(),
+                       Collections.emptyMap());

Review comment:
       Can we derive the external resources from `env`, instead of just passing 
in an empty map here?

##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
##########
@@ -198,6 +199,11 @@ public GlobalAggregateManager getGlobalAggregateManager() {
                throw new UnsupportedOperationException(ERROR_MSG);
        }
 
+       @Override
+       public Map<String, ExternalResourceDriver> getExternalResourceDrivers() 
{
+               return Collections.emptyMap();
+       }

Review comment:
       Should we throw an `UnsupportedOperationException` here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -79,6 +80,26 @@
                return externalResourceConfigs;
        }
 
+       /**
+        * Get the map of resource name and amount of all of enabled external 
resources.
+        */
+       private static Map<String, Long> 
getExternalResourceAmountMap(Configuration config) {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceAmountMap = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final long amount = 
config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0);
+                       externalResourceAmountMap.put(resourceName, amount);
+               }

Review comment:
       Same here.
   - Deprecated method used.
   - Existence check (warning on absence)
   - Sanity check.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
##########
@@ -41,10 +44,14 @@
 public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
 
        private final HashMap<String, BroadcastVariableMaterialization<?, ?>> 
broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
-       
+
+       private final Map<String, Set<? extends ExternalResourceInfo>> 
externalResources;
+
        public DistributedRuntimeUDFContext(TaskInfo taskInfo, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig,
-                                                                               
        Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> 
accumulators, MetricGroup metrics) {
+                                                                               
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
+                                                                               
MetricGroup metrics, Map<String, Set<? extends ExternalResourceInfo>> 
externalResources) {
                super(taskInfo, userCodeClassLoader, executionConfig, 
accumulators, cpTasks, metrics);
+               this.externalResources = externalResources;

Review comment:
       If you look at the callers of this constructor, it seems that most of 
the arguments are derived from an `Environment`. Maybe we can just pass in an 
`Environment` to reduce the amount of arguments.
   
   Alternatively, if we do not want to expose the entire `Environment` to this 
class, we can consider a builder/factory that takes an `Environment` and 
generates the constructor arguments.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
##########
@@ -33,7 +33,7 @@ public TestingExternalResourceDriver(Configuration config) {
        }
 
        @Override
-       public Set<ExternalResourceInfo> retrieveResourceInfo(long amount) {
+       public Set<? extends ExternalResourceInfo> retrieveResourceInfo(long 
amount) {

Review comment:
       Shouldn't this change belong to the previous commit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to