Please also post the job you're trying to run.

On 17.07.2016 08:43, Geoffrey Mon wrote:
The Java program I used to test DistributedCache was faulty since it actually created the cache from files on the machine on which the program was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines instead of virtual machines. I found the same error of the Python libraries and plan file not being found in the temporary directory. Has anyone else been able to successfully set up a Flink cluster to run Python jobs? I've been beginning to suspect that there may be some issues with running Python jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

    I wrote a simple Java plan that reads a file in the distributed
    cache and uses the first line from that file in a map operation.
    Sure enough, it works locally, but fails when the job is sent to a
    taskmanager on a worker node. Since DistributedCache seems to work
    for everyone else, I'm thinking that maybe some sort of file
    permissions are not properly set such that Flink is not able to
    successfully write distributed cache files.

    I used inotify-tools to watch the temporary files directory on
    both the master node and worker node. When the plan is being
    prepared, the jobmanager node wrote the Python modules and plan
    file to the temporary files directory. However, on the worker
    node, the directory tree was created, but the job failed before
    any of the module or plan files were even attempted to be written.
    Interestingly enough, there were no error messages or warnings
    about the cache.

    Cheers,
    Geoffrey

    On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        Could you write a java job that uses the Distributed cache to
        distribute files?

        If this fails then the DC is faulty, if it doesn't something
        in the Python API is wrong.


        On 15.07.2016 08:06, Geoffrey Mon wrote:
        I've come across similar issues when trying to set up Flink
        on Amazon EC2 instances. Presumably there is something wrong
        with my setup? Here is the flink-conf.yaml I am using:
        
https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml

        Thanks,
        Geoffrey

        On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon
        <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

            Hello,

            Here is the TaskManager log on pastebin:
            http://pastebin.com/XAJ56gn4

            I will look into whether the files were created.

            By the way, the cluster is made with virtual machines
            running on BlueData EPIC. I don't know if that might be
            related to the problem.

            Thanks,
            Geoffrey


            On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler
            <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                Hello Geoffrey,

                How often does this occur?

                Flink distributes the user-code and the python
                library using the Distributed Cache.

                Either the file is deleted right after being created
                for some reason, or the DC returns a file name before
                the file was created (which shouldn't happen, it
                should block it is available).

                If you are up to debugging this i would suggest
                looking into FileCache class and verifying whether
                the file in question is in fact created.

                The logs of the TaskManager of which the exception
                occurs could be of interest too; could you send them
                to me?

                Regards,
                Chesnay


                On 13.07.2016 04:11, Geoffrey Mon wrote:
                Hello all,

                I've set up Flink on a very small cluster of one
                master node and five worker nodes, following the
                instructions in the documentation
                
(https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
                I can run the included examples like WordCount and
                PageRank across the entire cluster, but when I try
                to run simple Python examples, I sometimes get a
                strange error on the first PythonMapPartition about
                the temporary folders that contain the streams of
                data between Python and Java.

                If I run jobs on only the taskmanager on the master
                node, Python examples run fine. However, if the jobs
                use the worker nodes, then I get the following error:

                org.apache.flink.client.program.ProgramInvocationException:
                The program execution failed: Job execution failed.
                at
                
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
                <snip>
                Caused by:
                org.apache.flink.runtime.client.JobExecutionException:
                Job execution failed.
                at
                
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
                <snip>
                Caused by: java.lang.Exception: The user defined
                'open()' method caused an exception: External
                process for task MapPartition (PythonMap) terminated
                prematurely.
                python: can't open file
                
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                [Errno 2] No such file or directory
                at
                
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
                <snip>
                Caused by: java.lang.RuntimeException: External
                process for task MapPartition (PythonMap) terminated
                prematurely.
                python: can't open file
                
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                [Errno 2] No such file or directory
                at
                
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
                at
                
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
                at
                
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
                at
                
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
                at
                
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
                ... 5 more

                I'm suspecting this issue has something to do with
                the data sending between the master and the workers,
                but I haven't been able to find any solutions.
                Presumably the temporary files weren't received
                properly and thus were not created properly?

                Thanks in advance.

                Cheers,
                Geoffrey




Reply via email to