I could not make it work as I wanted with taskmanager.numberOfTaskSlots to 2, but I found a way for running them in parallel, just creating a cluster for each job since they are independent
Thanks On Mon, Mar 30, 2020 at 4:22 PM Gary Yao <g...@apache.org> wrote: > Can you try to set config option taskmanager.numberOfTaskSlots to 2? By > default the TMs only offer one slot [1] independent from the number of CPU > cores. > > Best, > Gary > > [1] > https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199 > > On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá < > amarti...@alto-analytics.com> wrote: > >> Hello >> >> I'm running Flink over Amazon EMR and I'm trying to send several >> different batch jobs to the cluster after creating it. >> >> This is my cluster creation code: >> ---------------------------------------------------------------- >> StepConfig copyJarStep = new StepConfig() >> .withName("copy-jar-step") >> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) >> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") >> .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + >> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar")); >> >> List<StepConfig> stepConfigs = new ArrayList<>(); >> stepConfigs.add(copyJarStep); >> >> Application flink = new Application().withName("Flink"); >> >> Configuration flinkConfiguration = new Configuration() >> .withClassification("flink-conf") >> .addPropertiesEntry("jobmanager.heap.size", "2048m") >> .addPropertiesEntry("taskmanager.heap.size", "2048m") >> >> RunJobFlowRequest request = new RunJobFlowRequest() >> .withName("cluster-" + executionKey) >> .withReleaseLabel("emr-5.26.0") >> .withApplications(flink) >> .withConfigurations(flinkConfiguration) >> .withServiceRole("EMR_DefaultRole") >> .withJobFlowRole("EMR_EC2_DefaultRole") >> .withLogUri(getWorkPath() + "logs") >> .withInstances(new JobFlowInstancesConfig() >> .withEc2SubnetId("subnetid") >> .withInstanceCount(2) // 1 for task manager + 1 for job manager >> .withKeepJobFlowAliveWhenNoSteps(true) >> .withMasterInstanceType("m4.large") >> .withSlaveInstanceType("m4.large")) >> .withSteps(stepConfigs); >> >> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); >> >> ---------------------------------------------------------------------------------------------------------------- >> >> And this is how I add the jobs: >> >> --------------------------------------------------------------------------------- >> StepConfig runJobStep = new StepConfig() >> .withName("run-job-step") >> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) >> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") >> .withArgs("bash", "-c", "flink run -m yarn-cluster" >> + " --parallelism " + parallelism >> + " --class " + jobClass.getCanonicalName() >> + " /home/hadoop/flink-jobs.jar " >> + jobArguments)); >> >> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() >> .withJobFlowId(clusterId) >> .withSteps(runJobStep); >> >> AddJobFlowStepsResult result = >> amazonClient.getEmrClient().addJobFlowSteps(request); >> >> --------------------------------------------------------------------------------- >> >> And these are my jobs: >> >> - Job1 - parallelism 1 >> - Job2 - parallelism 1 >> - Job3 - parallelism 2 >> >> I'm using m4.large machines as slave so I have 2 cores in it, and I was >> expecting that Job1 and Job2 were running in parallel and then Job3 when >> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending >> status) for Job1 to finish before start. I see only one task manager is >> created for Job1, when finishes another one is created for Job2, and then 2 >> are created for Job3 >> >> Since I have 2 cores available why is it not running Job2 in the other >> instead of wait? is there any way to configure it? >> >> Thanks >> >> >> >>