Just to clarify, is Flink designed to allow submitting multiple jobs from a single program class when using a YARN cluster? I wasn't sure based on the documentation.
Cheers, Geoffrey On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <[email protected]> wrote: > Hello all, > > I'm running a Flink plan made up of multiple jobs. The source for my job > can be found here if it would help in any way: > https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java > Each of the jobs (except for the first job) depends on files generated by > the previous job; I'm running it on an AWS EMR cluster using YARN. > > When I submit the plan file, the first job runs as planned. After it > completes, the second job is submitted by the YARN client: > > <snip> > 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED > 02/09/2017 16:39:43 Job execution switched to status FINISHED. > 2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient > - Waiting until all TaskManagers have connected > Waiting until all TaskManagers have connected > 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient > - TaskManager status (5/5) > TaskManager status (5/5) > 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient > - All TaskManagers are connected > All TaskManagers are connected > 2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient > - Submitting job with JobID: > b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. > Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp://flink@ > <snip>.ec2.internal:35598/user/jobmanager#68430682] > > If the input file is small and the first job runs quickly (~1 minute works > for me), then the second job runs fine. However, if the input file for my > first job is large and the first job takes more than a minute or so to > complete, Flink will not acknowledge receiving the next job; the web Flink > console does not show any new jobs and Flink logs do not mention receiving > any new jobs after the first job has completed. The YARN client's job > submission times out after Flink does not respond: > > Caused by: > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: > Job submission to the JobManager timed out. You may increase > 'akka.client.timeout' in case the JobManager needs more time to configure > and confirm the job submission. > at > org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > > I have tried increasing akka.client.timeout to large values such as 1200s > (20 minutes), but even then Flink does not acknowledge or execute any other > jobs and there is the same timeout error. Does anyone know how I can get > Flink to execute all of the jobs properly? > > Cheers, > Geoffrey Mon >
