Hi Piotr,

I don't have 2 task managers, just one with 2 slots. That would be ok
according to my calculations, but as Craig said I need one more instance
for the cluster master. I was guessing the job manager was running in the
master and the task manager in the slave, but both job manager and task
manager run on slaves so I need 3 instances instead of 2 as I guessed.

Regards

On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hey,
>
> Isn’t explanation of the problem in the logs that you posted? Not enough
> memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2
> TaskManagers AND 1 JobManager with 6GB heap size each?
>
> Piotrek
>
> > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <
> amarti...@alto-analytics.com> wrote:
> >
> > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java
> code but I'm having some problems
> >
> > This is how I create the cluster:
> >
> ------------------------------------------------------------------------------------------------------------
> > StepConfig copyJarStep = new StepConfig()
> >     .withName("copy-jar-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> >
> > List<StepConfig> stepConfigs = new ArrayList<>();
> > stepConfigs.add(copyJarStep);
> >
> > Application flink = new Application().withName("Flink");
> >
> > Configuration flinkConfiguration = new Configuration()
> >     .withClassification("flink-conf")
> >     .addPropertiesEntry("jobmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> >
> > RunJobFlowRequest request = new RunJobFlowRequest()
> >     .withName("cluster-" + executionKey)
> >     .withReleaseLabel("emr-5.26.0")
> >     .withApplications(flink)
> >     .withConfigurations(flinkConfiguration)
> >     .withServiceRole("EMR_DefaultRole")
> >     .withJobFlowRole("EMR_EC2_DefaultRole")
> >     .withLogUri(getWorkPath() + "logs")
> >     .withInstances(new JobFlowInstancesConfig()
> >         .withEc2SubnetId("mysubnetid")
> >         .withInstanceCount(2)
> >         .withKeepJobFlowAliveWhenNoSteps(true)
> >         .withMasterInstanceType("m4.large")
> >         .withSlaveInstanceType("m4.large"))
> >     .withSteps(stepConfigs);
> >
> > RunJobFlowResult result =
> amazonClient.getEmrClient().runJobFlow(request);
> >
> ---------------------------------------------------------------------------------------------------------
> >
> > And this is how I add the jobwhen the cluster is ready:
> >
> ------------------------------------------------------------------------------------------
> > StepConfig runJobStep = new StepConfig()
> >     .withName("run-job-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism
> 2 --class es.trendit.flink.job.centrality.CentralityJob
> /home/hadoop/trendit-flink-jobs.jar <args...>"));
> >
> > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> > .withJobFlowId(clusterId)
> > .withSteps(runJobStep);
> >
> > AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
> >
> -----------------------------------------------------------------------------------------------
> >
> > As summary:
> > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> > - jobmanager.heap.size and taskmanager.heap.size: 6g
> > - taskmanager.numberOfTaskSlots: 2
> > - run flink with --parallelism 2
> > - so 1 EMR instance should be running the jobmanager and the other the
> taskmanager with 2 slots available
> >
> > But it fails after some time and I see this warning in the step stdout
> file:
> >
> ----------------------------------------------------------------------------------------------------------------------
> > 2020-03-31 14:37:47,288 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN
> session requires 12288MB of memory in the cluster. There are currently only
> 6144MB available.
> > The Flink YARN client will try to allocate the YARN session, but maybe
> not all TaskManagers are connecting from the beginning because the
> resources are currently not available in the cluster. The allocation might
> take more time than usual because the Flink YARN client needs to wait until
> the resources become available.
> > 2020-03-31 14:37:47,294 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is
> not enough memory available in the YARN cluster. The TaskManager(s) require
> 6144MB each. NodeManagers available: [6144]
> > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the
> following NodeManagers are available: [0]
> > The Flink YARN client will try to allocate the YARN session, but maybe
> not all TaskManagers are connecting from the beginning because the
> resources are currently not available in the cluster. The allocation might
> take more time than usual because the Flink YARN client needs to wait until
> the resources become available.
> > 2020-03-31 14:37:47,296 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster
> specification: ClusterSpecification{masterMemoryMB=6144,
> taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> >
> ----------------------------------------------------------------------------------------------------------------------
>
> >
> > And this error in the step stderr file:
> >
> ----------------------------------------------------------------------------------------------------------------------
>
> > org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> > at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> > at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> > ... 23 more
> > Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots within timeout of 300000 ms to run the job.
> Please make sure that the cluster has enough resources.
> > at
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> > at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > ...
> >
> ----------------------------------------------------------------------------------------------------------------------
> >
> > It looks to me like the TaskManager is not created at the beginning, any
> idea why is this happening and how to solve it? I could not find any
> relevant information in Flink docs
> >
> > Thanks
> >
> >
>
>

Reply via email to