[ 
https://issues.apache.org/jira/browse/FLINK-33529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33529:
----------------------------------
    Attachment: batch_wc.py

> PyFlink fails with "No module named 'cloudpickle"
> -------------------------------------------------
>
>                 Key: FLINK-33529
>                 URL: https://issues.apache.org/jira/browse/FLINK-33529
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.18.0
>         Environment: Python 3.7.16 or Python 3.9
> YARN
>            Reporter: Prabhu Joseph
>            Priority: Major
>         Attachments: batch_wc.py, flink1.17-get_site_packages.py, 
> flink1.18-get_site_packages.py
>
>
> PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
> program works fine on Flink 1.17. This is after the change 
> (https://issues.apache.org/jira/browse/FLINK-32034).
> *Repro:*
> {code}
> [hadoop@ip-1-2-3-4 ~]$ python --version
> Python 3.7.16
> [hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
> flink-1.18.0-1.amzn2.x86_64
> [hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
> [hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
> s3://prabhuflinks3/OUT2/
> {code}
> *Error*
> {code}
> ModuleNotFoundError: No module named 'cloudpickle'
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
>       at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
>       at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>       at 
> org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> {code}
> *Analysis*
> 1. On Flink 1.17 and Python-3.7.16, 
> PythonEnvironmentManagerUtils#getSitePackagesPath used to return following 
> two paths
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
> /tmp/lib/python3.7/site-packages
> /tmp/lib64/python3.7/site-packages
> {code}
> whereas Flink 1.18 (FLINK-32034) has changed the 
> PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is 
> returned
> {code}
> [root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
> /tmp/lib64/python3.7/site-packages
> [root@ip-172-31-45-97 tmp]#
> {code}
> The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
> which is not returned by the getSitePackagesPath in Flink1.18 causing the 
> pyflink job failure.
> *Attached batch_wc.py, flink1.17-get_site_packages.py and 
> flink1.18-get_site_packages.py.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to