Biao Geng created FLINK-38500:
---------------------------------

             Summary: PyFlink jobs using the thread mode cannot run 
consecutively on a session cluster on MacOS
                 Key: FLINK-38500
                 URL: https://issues.apache.org/jira/browse/FLINK-38500
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.20.3
            Reporter: Biao Geng


We notice that when we submit a same pyflink job using the thread mode 
consecutively to a same session cluster using flink1.20.3 on macOS, such 
exception would occur:


{code:java}
Caused by: java.lang.RuntimeException: java.lang.UnsatisfiedLinkError: Native 
Library 
/opt/homebrew/Cellar/[email protected]/3.11.13/Frameworks/Python.framework/Versions/3.11/Python
 already loaded in another classloader   at 
pemja.utils.CommonUtils.loadLibrary(CommonUtils.java:103)    at 
pemja.utils.CommonUtils.loadPython(CommonUtils.java:45)      at 
pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:365)
  at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:144)  at 
pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45)       at 
org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)
 at 
org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)
     at 
org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)
 at 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)
   at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
     at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)   
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)       at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at 
java.base/java.lang.Thread.run(Thread.java:829)Caused by: 
java.lang.UnsatisfiedLinkError: Native Library 
/opt/homebrew/Cellar/[email protected]/3.11.13/Frameworks/Python.framework/Versions/3.11/Python
 already loaded in another classloader     at 
java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2476)
     at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2705)  at 
java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2635)   at 
java.base/java.lang.Runtime.load0(Runtime.java:768)  at 
java.base/java.lang.System.load(System.java:1854)    at 
pemja.utils.CommonUtils.loadLibrary(CommonUtils.java:101)    ... 19 more
 {code}

 {code:python}
from pyflink.common import Configuration
from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment


def main():
    config = Configuration()
    config.set_string("python.execution-mode", "thread")

    env = StreamExecutionEnvironment.get_execution_environment(config)
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    env.from_collection([1, 2, 3, 4, 5]).map(lambda x: x + 1).print()
    env.execute()


if __name__ == "__main__":
    main()
 {code}


Such exception makes it annoying to verify pyflink jobs in session cluster. 

After some debugging, I find that the root cause is that pemja (ver 0.5.5) does 
not handle classloading correctly for mac in its 
{{pemja.utils.CommonUtils#loadPython}} when loading python interpreter's .so / 
Python for more than one time by different child classloader in the same JVM(tm 
instance). I will fix it in the pemja repo and deploy new pemja dependencies. 

For those who are bothered by such problem, one workaround is that moving 
flink-python jar from opt/ to lib/ . With the trick, python interpreter 
dependency would be loaded by AppClassloader instead of different child 
classloader from different jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to