Hi Niels, yes, in YARN mode, the default parallelism is the number of available slots.
You can change the default task parallelism like this: 1) Use the -p parameter when submitting a job via the CLI client [1] 2) Set a parallelism on the execution environment: env.setParallelism() Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html [2] https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#execution-environment-level 2016-08-23 10:29 GMT+02:00 Niels Basjes <ni...@basjes.nl>: > I did more digging and finally understand what goes wrong. > I create a yarn-session with 50 slots. > Then I run my job that (due to the fact that my HBase table has 100s of > regions) has a lot of inputsplits. > The job then runs with parallelism 50 because I did not specify the value. > As a consequence the second job I start in the same yarn-session is faced > with 0 available task slots and fails with this exception: > > 08/23/2016 09:58:52 Job execution switched to status FAILING. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the > operator parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: ...... Resources available to scheduler: > Number of instances=5, total number of slots=50, available slots=0 > > So my conclusion for now is that if you want to run batch jobs in > yarn-session then you MUST specify the parallelism for all steps or > otherwise it will fill the yarn-session completely and you cannot run > multiple jobs in parallel. > > Is this conclusion correct? > > Niels Basjes > > > On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Niels, >> >> In Flink, you don't need one task per file, since splits are assigned >> lazily to reading tasks. >> What exactly is the error you are getting when trying to read that many >> input splits? (Is it on the JobManager?) >> >> Regards, >> Robert >> >> On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <ni...@basjes.nl> wrote: >> >>> Hi, >>> >>> I'm working on a batch process using Flink and I ran into an interesting >>> problem. >>> The number of input splits in my job is really really large. >>> >>> I currently have a HBase input (with more than 1000 regions) and in the >>> past I have worked with MapReduce jobs doing 2000+ files. >>> >>> The problem I have is that if I run such a job in a "small" yarn-session >>> (i.e. less than 1000 tasks) I get a fatal error indicating that there are >>> not enough resources. >>> For a continuous streaming job this makes sense, yet for a batch job >>> (like I'm having) this is an undesirable error. >>> >>> For my HBase situation I currently have a workaround by overriding the >>> creatInputSplits method from the TableInputFormat and thus control the >>> input splits that are created. >>> >>> What is the correct way to solve this (no my cluster is NOT big enough >>> to run that many parallel tasks) ? >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >