Myasuka commented on a change in pull request #13500:
URL: https://github.com/apache/flink/pull/13500#discussion_r503146610



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -508,12 +515,18 @@ public void setStateBackend(StateBackend backend) {
                if (backend != null) {
                        try {
                                InstantiationUtil.writeObjectToConfig(backend, 
this.config, STATE_BACKEND);
+                               
this.config.setBoolean(STATE_BACKEND_USE_MANAGED_MEMORY, 
backend.useManagedMemory());

Review comment:
       For the purpose to test method, I think we should use 
`setStateBackendUsesManagedMemory(backend.useManagedMemory());` here

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -101,6 +101,10 @@
        private static final String TIME_CHARACTERISTIC = "timechar";
 
        private static final String MANAGED_MEMORY_FRACTION_PREFIX = 
"managedMemFraction.";
+       private static final ConfigOption<Boolean> 
STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions

Review comment:
       Add description for this config could help more.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -233,6 +236,41 @@ public static StateBackend 
fromApplicationOrConfigOrDefault(
                return backend;
        }
 
+       /**
+        * Checks whether state backend uses managed memory, without having to 
deserialize or load the state backend.
+        * @param config Cluster configuration.
+        * @param stateBackendFromApplicationUsesManagedMemory Whether the 
application-defined backend uses Flink's managed
+        *                                                       memory. Empty 
if application has not defined a backend.
+        * @param classLoader User code classloader.
+        * @return Whether the state backend uses managed memory.
+        */
+       public static boolean 
stateBackendFromApplicationOrConfigOrDefaultUseManagedMemory(
+                       Configuration config,
+                       Optional<Boolean> 
stateBackendFromApplicationUsesManagedMemory,
+                       ClassLoader classLoader) {
+
+               checkNotNull(config, "config");
+
+               // (1) the application defined state backend has precedence
+               if (stateBackendFromApplicationUsesManagedMemory.isPresent()) {
+                       return 
stateBackendFromApplicationUsesManagedMemory.get();
+               }
+
+               // (2) check if the config defines a state backend
+               try {
+                       final StateBackend fromConfig = 
loadStateBackendFromConfig(config, classLoader, null);

Review comment:
       We could pass parameter `LOG` instead of `null` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to