Hey, Isn’t explanation of the problem in the logs that you posted? Not enough memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 TaskManagers AND 1 JobManager with 6GB heap size each?
Piotrek > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá > <amarti...@alto-analytics.com> wrote: > > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code > but I'm having some problems > > This is how I create the cluster: > ------------------------------------------------------------------------------------------------------------ > StepConfig copyJarStep = new StepConfig() > .withName("copy-jar-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + > "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); > > List<StepConfig> stepConfigs = new ArrayList<>(); > stepConfigs.add(copyJarStep); > > Application flink = new Application().withName("Flink"); > > Configuration flinkConfiguration = new Configuration() > .withClassification("flink-conf") > .addPropertiesEntry("jobmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); > > RunJobFlowRequest request = new RunJobFlowRequest() > .withName("cluster-" + executionKey) > .withReleaseLabel("emr-5.26.0") > .withApplications(flink) > .withConfigurations(flinkConfiguration) > .withServiceRole("EMR_DefaultRole") > .withJobFlowRole("EMR_EC2_DefaultRole") > .withLogUri(getWorkPath() + "logs") > .withInstances(new JobFlowInstancesConfig() > .withEc2SubnetId("mysubnetid") > .withInstanceCount(2) > .withKeepJobFlowAliveWhenNoSteps(true) > .withMasterInstanceType("m4.large") > .withSlaveInstanceType("m4.large")) > .withSteps(stepConfigs); > > RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); > --------------------------------------------------------------------------------------------------------- > > And this is how I add the jobwhen the cluster is ready: > ------------------------------------------------------------------------------------------ > StepConfig runJobStep = new StepConfig() > .withName("run-job-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 > --class es.trendit.flink.job.centrality.CentralityJob > /home/hadoop/trendit-flink-jobs.jar <args...>")); > > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() > .withJobFlowId(clusterId) > .withSteps(runJobStep); > > AddJobFlowStepsResult result = > amazonClient.getEmrClient().addJobFlowSteps(request); > ----------------------------------------------------------------------------------------------- > > As summary: > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) > - jobmanager.heap.size and taskmanager.heap.size: 6g > - taskmanager.numberOfTaskSlots: 2 > - run flink with --parallelism 2 > - so 1 EMR instance should be running the jobmanager and the other the > taskmanager with 2 slots available > > But it fails after some time and I see this warning in the step stdout file: > ---------------------------------------------------------------------------------------------------------------------- > 2020-03-31 14:37:47,288 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - This YARN > session requires 12288MB of memory in the cluster. There are currently only > 6144MB available. > The Flink YARN client will try to allocate the YARN session, but maybe not > all TaskManagers are connecting from the beginning because the resources are > currently not available in the cluster. The allocation might take more time > than usual because the Flink YARN client needs to wait until the resources > become available. > 2020-03-31 14:37:47,294 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is not > enough memory available in the YARN cluster. The TaskManager(s) require > 6144MB each. NodeManagers available: [6144] > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the > following NodeManagers are available: [0] > The Flink YARN client will try to allocate the YARN session, but maybe not > all TaskManagers are connecting from the beginning because the resources are > currently not available in the cluster. The allocation might take more time > than usual because the Flink YARN client needs to wait until the resources > become available. > 2020-03-31 14:37:47,296 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=6144, > taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2} > ---------------------------------------------------------------------------------------------------------------------- > > > And this error in the step stderr file: > ---------------------------------------------------------------------------------------------------------------------- > > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > ... > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 23 more > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate enough slots within timeout of 300000 ms to run the job. > Please make sure that the cluster has enough resources. > at > org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > ... > ---------------------------------------------------------------------------------------------------------------------- > > It looks to me like the TaskManager is not created at the beginning, any idea > why is this happening and how to solve it? I could not find any relevant > information in Flink docs > > Thanks > >