Hey,

Isn’t explanation of the problem in the logs that you posted? Not enough 
memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 
TaskManagers AND 1 JobManager with 6GB heap size each?

Piotrek

> On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá 
> <amarti...@alto-analytics.com> wrote:
> 
> Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code 
> but I'm having some problems
> 
> This is how I create the cluster:
> ------------------------------------------------------------------------------------------------------------
> StepConfig copyJarStep = new StepConfig()
>     .withName("copy-jar-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + 
> "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> 
> List<StepConfig> stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
> 
> Application flink = new Application().withName("Flink");
> 
> Configuration flinkConfiguration = new Configuration()
>     .withClassification("flink-conf")
>     .addPropertiesEntry("jobmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> 
> RunJobFlowRequest request = new RunJobFlowRequest()
>     .withName("cluster-" + executionKey)
>     .withReleaseLabel("emr-5.26.0")
>     .withApplications(flink)
>     .withConfigurations(flinkConfiguration)
>     .withServiceRole("EMR_DefaultRole")
>     .withJobFlowRole("EMR_EC2_DefaultRole")
>     .withLogUri(getWorkPath() + "logs")
>     .withInstances(new JobFlowInstancesConfig()
>         .withEc2SubnetId("mysubnetid")
>         .withInstanceCount(2)
>         .withKeepJobFlowAliveWhenNoSteps(true)
>         .withMasterInstanceType("m4.large")
>         .withSlaveInstanceType("m4.large"))
>     .withSteps(stepConfigs);
> 
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
> ---------------------------------------------------------------------------------------------------------
> 
> And this is how I add the jobwhen the cluster is ready:
> ------------------------------------------------------------------------------------------
> StepConfig runJobStep = new StepConfig()
>     .withName("run-job-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 
> --class es.trendit.flink.job.centrality.CentralityJob 
> /home/hadoop/trendit-flink-jobs.jar <args...>"));
> 
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> .withJobFlowId(clusterId)
> .withSteps(runJobStep);
> 
> AddJobFlowStepsResult result = 
> amazonClient.getEmrClient().addJobFlowSteps(request);
> -----------------------------------------------------------------------------------------------
> 
> As summary:
> - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> - jobmanager.heap.size and taskmanager.heap.size: 6g
> - taskmanager.numberOfTaskSlots: 2
> - run flink with --parallelism 2
> - so 1 EMR instance should be running the jobmanager and the other the 
> taskmanager with 2 slots available
> 
> But it fails after some time and I see this warning in the step stdout file:
> ----------------------------------------------------------------------------------------------------------------------
> 2020-03-31 14:37:47,288 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN 
> session requires 12288MB of memory in the cluster. There are currently only 
> 6144MB available.
> The Flink YARN client will try to allocate the YARN session, but maybe not 
> all TaskManagers are connecting from the beginning because the resources are 
> currently not available in the cluster. The allocation might take more time 
> than usual because the Flink YARN client needs to wait until the resources 
> become available.
> 2020-03-31 14:37:47,294 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is not 
> enough memory available in the YARN cluster. The TaskManager(s) require 
> 6144MB each. NodeManagers available: [6144]
> After allocating the JobManager (6144MB) and (0/1) TaskManagers, the 
> following NodeManagers are available: [0]
> The Flink YARN client will try to allocate the YARN session, but maybe not 
> all TaskManagers are connecting from the beginning because the resources are 
> currently not available in the cluster. The allocation might take more time 
> than usual because the Flink YARN client needs to wait until the resources 
> become available.
> 2020-03-31 14:37:47,296 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
> specification: ClusterSpecification{masterMemoryMB=6144, 
> taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> ----------------------------------------------------------------------------------------------------------------------
>  
> 
> And this error in the step stderr file:
> ----------------------------------------------------------------------------------------------------------------------
>  
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> ...
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 23 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate enough slots within timeout of 300000 ms to run the job. 
> Please make sure that the cluster has enough resources.
> at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ...
> ----------------------------------------------------------------------------------------------------------------------
> 
> It looks to me like the TaskManager is not created at the beginning, any idea 
> why is this happening and how to solve it? I could not find any relevant 
> information in Flink docs
> 
> Thanks
> 
> 

Reply via email to