Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work.

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <geof...@gmail.com> wrote:

> Just to clarify, is Flink designed to allow submitting multiple jobs from
> a single program class when using a YARN cluster? I wasn't sure based on
> the documentation.
>
> Cheers,
> Geoffrey
>
>
> On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <geof...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm running a Flink plan made up of multiple jobs. The source for my job
>> can be found here if it would help in any way: https://github.com/
>> quinngroup/flink-r1dl/blob/master/src/main/java/com/
>> github/quinngroup/R1DL.java
>> Each of the jobs (except for the first job) depends on files generated by
>> the previous job; I'm running it on an AWS EMR cluster using YARN.
>>
>> When I submit the plan file, the first job runs as planned. After it
>> completes, the second job is submitted by the YARN client:
>>
>> <snip>
>> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
>> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
>> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - Waiting until all TaskManagers have connected
>> Waiting until all TaskManagers have connected
>> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - TaskManager status (5/5)
>> TaskManager status (5/5)
>> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - All TaskManagers are connected
>> All TaskManagers are connected
>> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - Submitting job with JobID:
>> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
>> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@<snip>.
>> ec2.internal:35598/user/jobmanager#68430682]
>>
>> If the input file is small and the first job runs quickly (~1 minute
>> works for me), then the second job runs fine. However, if the input file
>> for my first job is large and the first job takes more than a minute or so
>> to complete, Flink will not acknowledge receiving the next job; the web
>> Flink console does not show any new jobs and Flink logs do not mention
>> receiving any new jobs after the first job has completed. The YARN client's
>> job submission times out after Flink does not respond:
>>
>> Caused by: 
>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.
>> handleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.
>> handleMessage(JobClientActor.java:239)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(
>> FlinkUntypedActor.java:88)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
>> FlinkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>> UntypedActor.scala:167)
>>
>> I have tried increasing akka.client.timeout to large values such as 1200s
>> (20 minutes), but even then Flink does not acknowledge or execute any other
>> jobs and there is the same timeout error. Does anyone know how I can get
>> Flink to execute all of the jobs properly?
>>
>> Cheers,
>> Geoffrey Mon
>>
>

Reply via email to