Hi Geoffrey, I think the "per job yarn cluster" feature does probably not work for one main() function submitting multiple jobs. If you have a yarn session + regular "flink run" it should work.
On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <geof...@gmail.com> wrote: > Just to clarify, is Flink designed to allow submitting multiple jobs from > a single program class when using a YARN cluster? I wasn't sure based on > the documentation. > > Cheers, > Geoffrey > > > On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <geof...@gmail.com> wrote: > >> Hello all, >> >> I'm running a Flink plan made up of multiple jobs. The source for my job >> can be found here if it would help in any way: https://github.com/ >> quinngroup/flink-r1dl/blob/master/src/main/java/com/ >> github/quinngroup/R1DL.java >> Each of the jobs (except for the first job) depends on files generated by >> the previous job; I'm running it on an AWS EMR cluster using YARN. >> >> When I submit the plan file, the first job runs as planned. After it >> completes, the second job is submitted by the YARN client: >> >> <snip> >> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED >> 02/09/2017 16:39:43 Job execution switched to status FINISHED. >> 2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient >> - Waiting until all TaskManagers have connected >> Waiting until all TaskManagers have connected >> 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient >> - TaskManager status (5/5) >> TaskManager status (5/5) >> 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient >> - All TaskManagers are connected >> All TaskManagers are connected >> 2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient >> - Submitting job with JobID: >> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. >> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for >> job completion. >> Connected to JobManager at Actor[akka.tcp://flink@<snip>. >> ec2.internal:35598/user/jobmanager#68430682] >> >> If the input file is small and the first job runs quickly (~1 minute >> works for me), then the second job runs fine. However, if the input file >> for my first job is large and the first job takes more than a minute or so >> to complete, Flink will not acknowledge receiving the next job; the web >> Flink console does not show any new jobs and Flink logs do not mention >> receiving any new jobs after the first job has completed. The YARN client's >> job submission times out after Flink does not respond: >> >> Caused by: >> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: >> Job submission to the JobManager timed out. You may increase >> 'akka.client.timeout' in case the JobManager needs more time to configure >> and confirm the job submission. >> at org.apache.flink.runtime.client.JobSubmissionClientActor. >> handleCustomMessage(JobSubmissionClientActor.java:119) >> at org.apache.flink.runtime.client.JobClientActor. >> handleMessage(JobClientActor.java:239) >> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID( >> FlinkUntypedActor.java:88) >> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive( >> FlinkUntypedActor.java:68) >> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( >> UntypedActor.scala:167) >> >> I have tried increasing akka.client.timeout to large values such as 1200s >> (20 minutes), but even then Flink does not acknowledge or execute any other >> jobs and there is the same timeout error. Does anyone know how I can get >> Flink to execute all of the jobs properly? >> >> Cheers, >> Geoffrey Mon >> >