dianfu commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r557003451



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1736,6 +1736,9 @@ def from_data_stream(self, data_stream: DataStream, 
*fields: Union[str, Expressi
         .. versionadded:: 1.12.0
         """
         j_data_stream = data_stream._j_data_stream
+        get_gateway().jvm \
+            .org.apache.flink.python.util.PythonConfigUtil.setManagedMemory(
+            j_data_stream.getTransformation(), 
self._j_tenv.getConfig().getConfiguration())

Review comment:
       The configuration maybe configured via flink-conf.yaml and then it's not 
available in self._j_tenv.getConfig().getConfiguration().

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +221,20 @@ private static boolean 
isPythonOperator(StreamOperatorFactory streamOperatorFact
         }
     }
 
+    private static boolean isPythonOperator(Transformation<?> transform) {
+        if (transform instanceof OneInputTransformation

Review comment:
       What about refactor it a bit to make it more readable?
   ```
   if (transform instanceof OneInputTransformation) {
          return isPythonOperator(((OneInputTransformation) 
transform).getOperatorFactory());
   } else if (transform instanceof TwoInputTransformation) {
      return  isPythonOperator(((TwoInputTransformation) 
transform).getOperatorFactory());
   } else {
       Preconditions.checkState(transform instanceof 
AbstractMultipleInputTransformation);
      return isPythonOperator(
                               ((AbstractMultipleInputTransformation) 
transform).getOperatorFactory());
   }
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -128,6 +128,23 @@ private static void chainStreamNode(
         firstStream.setSlotSharingGroup(secondStream.getSlotSharingGroup());
     }
 
+    /** Set Python Operator Use Managed Memory. */
+    public static void setManagedMemory(Transformation<?> transformation, 
Configuration config) {
+        if (config.getBoolean(PythonOptions.USE_MANAGED_MEMORY)) {
+            setManagedMemory(transformation);
+        }
+    }
+
+    private static void setManagedMemory(Transformation<?> transformation) {
+        List<Transformation<?>> inputTransformations = 
transformation.getInputs();

Review comment:
       nit: what about moving this line just before `for (Transformation 
inputTransformation : inputTransformations) ` where it's used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to