Hi Ufuk, The master instance of the cluster was also a m3.xlarge instance with 15 GB RAM, which I would've expected to be enough. I have gotten the program to run successfully on a personal virtual cluster where each node has 8 GB RAM and where the master node was also a worker node, so the problem appears to have something to do with YARN's memory behavior (such as on EMR).
Nevertheless, it would probably be a good idea to modify my code to reduce its memory usage. When running my code on my local cluster, performance was probably bottlenecked. The job does use a for loop to run the core operations for a specific number of times, specified as a command line parameter. If it helps, here is my code: Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260 is the core for loop) Java: https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java (L120 is the core for loop) I would expect the join operations to be a big cause of the excessive memory usage. Thanks! Geoffrey On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <u...@apache.org> wrote: > The Python API is in alpha state currently, so we would have to check if > it is related specifically to that. Looping in Chesnay who worked on that. > > The JVM GC error happens on the client side as that's where the optimizer > runs. How much memory does the client submitting the job have? > > How do you compose the job? Do you have nested loops, e.g. for() { ... > bulk iteration Flink program }? > > – Ufuk > > On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote: > > Hello all, > > > > I have a pretty complicated plan file using the Flink Python API running > on > > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a > > dictionary learning algorithm and has to run a sequence of operations > many > > times; each sequence involves bulk iterations with join operations and > > other more intensive operations, and depends on the results of the > previous > > sequence. I have found that when the number of times to run this sequence > > of operations is high (e.g. 20) I get this exception: > > > > Uncaught error from thread > > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM > > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] > > java.lang.StackOverflowError > > at > java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > .............................. > > > > I assume this problem is caused by having to send too many serialized > > operations between Java and Python. When using a Java implementation of > the > > same operations, I also get: > > > > java.lang.OutOfMemoryError: GC overhead limit exceeded > > at > org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106) > > at > org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99) > > at > org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90) > > at > org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69) > > at > org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81) > > at > org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607) > > ............ > > > > The problem seems to caused by YARN's handling of memory, because I have > > gotten the same Python implementation to work on a smaller, local virtual > > cluster that is not using YARN, even though my local cluster has far > fewer > > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR > is > > using. After the YARN job has failed, sometimes a python process is left > on > > the cluster using up most of the RAM. > > > > How can I solve this issue? I am unsure of how to reduce the number of > > operations while keeping the same functionality. > > > > Thanks, > > Geoffrey > > > >