dianfu commented on a change in pull request #14621: URL: https://github.com/apache/flink/pull/14621#discussion_r557882505
########## File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java ########## @@ -270,4 +296,39 @@ private static boolean isExecuteInBatchMode( } return !existsUnboundedSource; } + + public static Configuration getConfig(StreamExecutionEnvironment env, TableConfig tableConfig) { Review comment: What about rename to getMergedConfig? ########## File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java ########## @@ -270,4 +296,39 @@ private static boolean isExecuteInBatchMode( } return !existsUnboundedSource; } + + public static Configuration getConfig(StreamExecutionEnvironment env, TableConfig tableConfig) { + try { + StreamExecutionEnvironment readEnv = getRealEnvironment(env); + Configuration config = + PythonDependencyUtils.configurePythonDependencies( + readEnv.getCachedFiles(), getMergedConfiguration(readEnv, tableConfig)); + config.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId()); + return config; + } catch (NoSuchFieldException + | IllegalAccessException + | NoSuchMethodException + | InvocationTargetException e) { + throw new TableException("Method getConfig failed.", e); + } + } + + private static StreamExecutionEnvironment getRealEnvironment(StreamExecutionEnvironment env) + throws NoSuchFieldException, IllegalAccessException { + Field realExecEnvField = + DummyStreamExecutionEnvironment.class.getDeclaredField("realExecEnv"); + realExecEnvField.setAccessible(true); + while (env instanceof DummyStreamExecutionEnvironment) { + env = (StreamExecutionEnvironment) realExecEnvField.get(env); + } + return env; + } + + private static Configuration getMergedConfiguration( Review comment: What about remove this method to avoid confusing with getMergedConfig? ########## File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java ########## @@ -215,6 +228,19 @@ private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFact } } + private static boolean isPythonOperator(Transformation<?> transform) { + if (transform instanceof OneInputTransformation) { + return isPythonOperator(((OneInputTransformation) transform).getOperatorFactory()); + } else if (transform instanceof TwoInputTransformation) { + return isPythonOperator(((TwoInputTransformation) transform).getOperatorFactory()); + } else if (transform instanceof AbstractMultipleInputTransformation) { + return isPythonOperator( + ((AbstractMultipleInputTransformation) transform).getOperatorFactory()); + } else { + return false; Review comment: Should throw an exception in this case as it means that we are missing something. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org