Hi Gordon, I was using a Flink session that lasted as long as the plan jar was still running (which I believe would be a "per job yarn cluster"), by submitting a command to EMR that looked like: flink run -m yarn-cluster -yn 5 [jar] [jar arguments]
Cheers, Geoffrey On Fri, Feb 17, 2017 at 12:09 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Geoffrey, > > Thanks for investigating and updating on this. Good to know that it is > working! > > Just to clarify, was your series of jobs submitted to a “yarn session + > regular bin/flink run”, or “per job yarn cluster”? > I’m asking just to make sure of the limitations Robert mentioned. > > Cheers, > Gordon > > > On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com) > wrote: > > Hi Robert, > > Thanks for your reply. I've done some further testing and (hopefully) > solved the issue; this turned out to be a red herring. After discovering > that the same issue manifested itself when testing on my local machine, I > found that multiple jobs can be submitted from a main() function for both > temporary and permanent Flink YARN clusters, and that the issue was not > with Flink or with YARN, but with my job file. > > In one part of my job, I need to fill in missing components of a vector > with zeroes. I did this by combining the vector DataSet with another > DataSet containing indexed zeroes using a union operation and an > aggregation operation. In my problematic job, I used > ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of > Tuples containing an index and a zero. However, for input files with very > large parameters, I needed to generate very large length DataSets of > zeroes, and since I was using fromElements, the client needed to send the > Flink runtime all of the elements with which to create the DataSet (lots > and lots of zeroes). This caused the job to time out before execution, > making me think that the job had not been properly received by the runtime. > > I've replaced this with ExecutionEnvironment#generateSequence and a map > function mapping each number of the generated sequence to a tuple with a > zero. This has solved the issue and my job seems to be running fine for now. > ( > https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370 > ) > > Again, thank you very much for your help. > > Sincerely, > Geoffrey > > On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger <rmetz...@apache.org> > wrote: > > Hi Geoffrey, > > I think the "per job yarn cluster" feature does probably not work for one > main() function submitting multiple jobs. > If you have a yarn session + regular "flink run" it should work. > > On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <geof...@gmail.com> wrote: > > Just to clarify, is Flink designed to allow submitting multiple jobs from > a single program class when using a YARN cluster? I wasn't sure based on > the documentation. > > Cheers, > Geoffrey > > > On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <geof...@gmail.com> wrote: > > Hello all, > > I'm running a Flink plan made up of multiple jobs. The source for my job > can be found here if it would help in any way: > https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java > Each of the jobs (except for the first job) depends on files generated by > the previous job; I'm running it on an AWS EMR cluster using YARN. > > When I submit the plan file, the first job runs as planned. After it > completes, the second job is submitted by the YARN client: > > <snip> > 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED > 02/09/2017 16:39:43 Job execution switched to status FINISHED. > 2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient > - Waiting until all TaskManagers have connected > Waiting until all TaskManagers have connected > 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient > - TaskManager status (5/5) > TaskManager status (5/5) > 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient > - All TaskManagers are connected > All TaskManagers are connected > 2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient > - Submitting job with JobID: > b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. > Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp://flink@ > <snip>.ec2.internal:35598/user/jobmanager#68430682] > > If the input file is small and the first job runs quickly (~1 minute works > for me), then the second job runs fine. However, if the input file for my > first job is large and the first job takes more than a minute or so to > complete, Flink will not acknowledge receiving the next job; the web Flink > console does not show any new jobs and Flink logs do not mention receiving > any new jobs after the first job has completed. The YARN client's job > submission times out after Flink does not respond: > > Caused by: > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: > Job submission to the JobManager timed out. You may increase > 'akka.client.timeout' in case the JobManager needs more time to configure > and confirm the job submission. > at > org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > > I have tried increasing akka.client.timeout to large values such as 1200s > (20 minutes), but even then Flink does not acknowledge or execute any other > jobs and there is the same timeout error. Does anyone know how I can get > Flink to execute all of the jobs properly? > > Cheers, > Geoffrey Mon > > >