Hi Gordon,

I was using a Flink session that lasted as long as the plan jar was still
running (which I believe would be a "per job yarn cluster"), by submitting
a command to EMR that looked like:
flink run -m yarn-cluster -yn 5 [jar] [jar arguments]

Cheers,
Geoffrey

On Fri, Feb 17, 2017 at 12:09 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Geoffrey,
>
> Thanks for investigating and updating on this. Good to know that it is
> working!
>
> Just to clarify, was your series of jobs submitted to a “yarn session +
> regular bin/flink run”, or “per job yarn cluster”?
> I’m asking just to make sure of the limitations Robert mentioned.
>
> Cheers,
> Gordon
>
>
> On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com)
> wrote:
>
> Hi Robert,
>
> Thanks for your reply. I've done some further testing and (hopefully)
> solved the issue; this turned out to be a red herring.  After discovering
> that the same issue manifested itself when testing on my local machine, I
> found that multiple jobs can be submitted from a main() function for both
> temporary and permanent Flink YARN clusters, and that the issue was not
> with Flink or with YARN, but with my job file.
>
> In one part of my job, I need to fill in missing components of a vector
> with zeroes. I did this by combining the vector DataSet with another
> DataSet containing indexed zeroes using a union operation and an
> aggregation operation. In my problematic job, I used
> ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
> Tuples containing an index and a zero. However, for input files with very
> large parameters, I needed to generate very large length DataSets of
> zeroes, and since I was using fromElements, the client needed to send the
> Flink runtime all of the elements with which to create the DataSet (lots
> and lots of zeroes). This caused the job to time out before execution,
> making me think that the job had not been properly received by the runtime.
>
> I've replaced this with ExecutionEnvironment#generateSequence and a map
> function mapping each number of the generated sequence to a tuple with a
> zero. This has solved the issue and my job seems to be running fine for now.
> (
> https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
> )
>
> Again, thank you very much for your help.
>
> Sincerely,
> Geoffrey
>
> On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger <rmetz...@apache.org>
> wrote:
>
> Hi Geoffrey,
>
> I think the "per job yarn cluster" feature does probably not work for one
> main() function submitting multiple jobs.
> If you have a yarn session + regular "flink run" it should work.
>
> On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <geof...@gmail.com> wrote:
>
> Just to clarify, is Flink designed to allow submitting multiple jobs from
> a single program class when using a YARN cluster? I wasn't sure based on
> the documentation.
>
> Cheers,
> Geoffrey
>
>
> On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <geof...@gmail.com> wrote:
>
> Hello all,
>
> I'm running a Flink plan made up of multiple jobs. The source for my job
> can be found here if it would help in any way:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> Each of the jobs (except for the first job) depends on files generated by
> the previous job; I'm running it on an AWS EMR cluster using YARN.
>
> When I submit the plan file, the first job runs as planned. After it
> completes, the second job is submitted by the YARN client:
>
> <snip>
> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Waiting until all TaskManagers have connected
> Waiting until all TaskManagers have connected
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - TaskManager status (5/5)
> TaskManager status (5/5)
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - All TaskManagers are connected
> All TaskManagers are connected
> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Submitting job with JobID:
> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@
> <snip>.ec2.internal:35598/user/jobmanager#68430682]
>
> If the input file is small and the first job runs quickly (~1 minute works
> for me), then the second job runs fine. However, if the input file for my
> first job is large and the first job takes more than a minute or so to
> complete, Flink will not acknowledge receiving the next job; the web Flink
> console does not show any new jobs and Flink logs do not mention receiving
> any new jobs after the first job has completed. The YARN client's job
> submission times out after Flink does not respond:
>
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
> I have tried increasing akka.client.timeout to large values such as 1200s
> (20 minutes), but even then Flink does not acknowledge or execute any other
> jobs and there is the same timeout error. Does anyone know how I can get
> Flink to execute all of the jobs properly?
>
> Cheers,
> Geoffrey Mon
>
>
>

Reply via email to