dianfu commented on a change in pull request #12370: URL: https://github.com/apache/flink/pull/12370#discussion_r433644752
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala ########## @@ -150,6 +152,64 @@ trait CommonPythonBase { } realEnv } + + private def isPythonWorkerUsingManagedMemory(config: Configuration): Boolean = { + val clazz = loadClass("org.apache.flink.python.PythonOptions") + config.getBoolean(clazz.getField("USE_MANAGED_MEMORY").get(null) + .asInstanceOf[ConfigOption[java.lang.Boolean]]) + } + + private def getPythonWorkerMemory(config: Configuration): MemorySize = { + val clazz = loadClass("org.apache.flink.python.PythonOptions") + val pythonFrameworkMemorySize = MemorySize.parse( + config.getString( + clazz.getField("PYTHON_FRAMEWORK_MEMORY_SIZE").get(null) + .asInstanceOf[ConfigOption[String]])) + val pythonBufferMemorySize = MemorySize.parse( + config.getString( + clazz.getField("PYTHON_DATA_BUFFER_MEMORY_SIZE").get(null) + .asInstanceOf[ConfigOption[String]])) + pythonFrameworkMemorySize.add(pythonBufferMemorySize) + } + + private def checkPythonWorkerMemory( + config: Configuration, env: StreamExecutionEnvironment = null): Unit = { + if (!isPythonWorkerUsingManagedMemory(config)) { + val taskOffHeapMemory = config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY) + val requiredPythonWorkerOffHeapMemory = getPythonWorkerMemory(config) + if (taskOffHeapMemory.compareTo(requiredPythonWorkerOffHeapMemory) < 0) { + throw new TableException(String.format("The configured Task Off-Heap Memory %s is less " + + "than the least required Python worker Memory %s. The Task Off-Heap Memory can be " + + "configured using the configuration key 'taskmanager.memory.task.off-heap.size'.", + taskOffHeapMemory, requiredPythonWorkerOffHeapMemory)) + } + } else if (env != null && isRocksDbUsingManagedMemory(env)) { + throw new TableException("Currently it doesn't support to use Managed Memory for both " + + "RocksDB state backend and Python worker at the same time. You can either configure " + + "RocksDB state backend to use Task Off-Heap Memory via the configuration key " + + "'state.backend.rocksdb.memory.managed' or configure Python worker to use " + + "Task Off-Heap Memory via the configuration key " + + "'python.fn-execution.memory.managed'.") + } + } + + private def isRocksDbUsingManagedMemory(env: StreamExecutionEnvironment): Boolean = { Review comment: Do you mean the duplicate methods between the old planner and the blink planner? If so, I'm afraid it's currently not possible as there is way to share code between them. Although it allows to share the code technically by moving the shared code to flink-table-common, I think it's not a good idea to do that for this case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org