Hi Piotr, I don't have 2 task managers, just one with 2 slots. That would be ok according to my calculations, but as Craig said I need one more instance for the cluster master. I was guessing the job manager was running in the master and the task manager in the slave, but both job manager and task manager run on slaves so I need 3 instances instead of 2 as I guessed.
Regards On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hey, > > Isn’t explanation of the problem in the logs that you posted? Not enough > memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 > TaskManagers AND 1 JobManager with 6GB heap size each? > > Piotrek > > > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá < > amarti...@alto-analytics.com> wrote: > > > > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java > code but I'm having some problems > > > > This is how I create the cluster: > > > ------------------------------------------------------------------------------------------------------------ > > StepConfig copyJarStep = new StepConfig() > > .withName("copy-jar-step") > > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > > .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + > "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); > > > > List<StepConfig> stepConfigs = new ArrayList<>(); > > stepConfigs.add(copyJarStep); > > > > Application flink = new Application().withName("Flink"); > > > > Configuration flinkConfiguration = new Configuration() > > .withClassification("flink-conf") > > .addPropertiesEntry("jobmanager.heap.size", "6g") > > .addPropertiesEntry("taskmanager.heap.size", "6g") > > .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); > > > > RunJobFlowRequest request = new RunJobFlowRequest() > > .withName("cluster-" + executionKey) > > .withReleaseLabel("emr-5.26.0") > > .withApplications(flink) > > .withConfigurations(flinkConfiguration) > > .withServiceRole("EMR_DefaultRole") > > .withJobFlowRole("EMR_EC2_DefaultRole") > > .withLogUri(getWorkPath() + "logs") > > .withInstances(new JobFlowInstancesConfig() > > .withEc2SubnetId("mysubnetid") > > .withInstanceCount(2) > > .withKeepJobFlowAliveWhenNoSteps(true) > > .withMasterInstanceType("m4.large") > > .withSlaveInstanceType("m4.large")) > > .withSteps(stepConfigs); > > > > RunJobFlowResult result = > amazonClient.getEmrClient().runJobFlow(request); > > > --------------------------------------------------------------------------------------------------------- > > > > And this is how I add the jobwhen the cluster is ready: > > > ------------------------------------------------------------------------------------------ > > StepConfig runJobStep = new StepConfig() > > .withName("run-job-step") > > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > > .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism > 2 --class es.trendit.flink.job.centrality.CentralityJob > /home/hadoop/trendit-flink-jobs.jar <args...>")); > > > > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() > > .withJobFlowId(clusterId) > > .withSteps(runJobStep); > > > > AddJobFlowStepsResult result = > amazonClient.getEmrClient().addJobFlowSteps(request); > > > ----------------------------------------------------------------------------------------------- > > > > As summary: > > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) > > - jobmanager.heap.size and taskmanager.heap.size: 6g > > - taskmanager.numberOfTaskSlots: 2 > > - run flink with --parallelism 2 > > - so 1 EMR instance should be running the jobmanager and the other the > taskmanager with 2 slots available > > > > But it fails after some time and I see this warning in the step stdout > file: > > > ---------------------------------------------------------------------------------------------------------------------- > > 2020-03-31 14:37:47,288 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - This YARN > session requires 12288MB of memory in the cluster. There are currently only > 6144MB available. > > The Flink YARN client will try to allocate the YARN session, but maybe > not all TaskManagers are connecting from the beginning because the > resources are currently not available in the cluster. The allocation might > take more time than usual because the Flink YARN client needs to wait until > the resources become available. > > 2020-03-31 14:37:47,294 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is > not enough memory available in the YARN cluster. The TaskManager(s) require > 6144MB each. NodeManagers available: [6144] > > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the > following NodeManagers are available: [0] > > The Flink YARN client will try to allocate the YARN session, but maybe > not all TaskManagers are connecting from the beginning because the > resources are currently not available in the cluster. The allocation might > take more time than usual because the Flink YARN client needs to wait until > the resources become available. > > 2020-03-31 14:37:47,296 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=6144, > taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2} > > > ---------------------------------------------------------------------------------------------------------------------- > > > > > And this error in the step stderr file: > > > ---------------------------------------------------------------------------------------------------------------------- > > > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) > > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > > ... > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > > ... 23 more > > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate enough slots within timeout of 300000 ms to run the job. > Please make sure that the cluster has enough resources. > > at > org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > ... > > > ---------------------------------------------------------------------------------------------------------------------- > > > > It looks to me like the TaskManager is not created at the beginning, any > idea why is this happening and how to solve it? I could not find any > relevant information in Flink docs > > > > Thanks > > > > > >