Hello I'm running Flink over Amazon EMR and I'm trying to send several different batch jobs to the cluster after creating it.
This is my cluster creation code: ---------------------------------------------------------------- StepConfig copyJarStep = new StepConfig() .withName("copy-jar-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar")); List<StepConfig> stepConfigs = new ArrayList<>(); stepConfigs.add(copyJarStep); Application flink = new Application().withName("Flink"); Configuration flinkConfiguration = new Configuration() .withClassification("flink-conf") .addPropertiesEntry("jobmanager.heap.size", "2048m") .addPropertiesEntry("taskmanager.heap.size", "2048m") RunJobFlowRequest request = new RunJobFlowRequest() .withName("cluster-" + executionKey) .withReleaseLabel("emr-5.26.0") .withApplications(flink) .withConfigurations(flinkConfiguration) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri(getWorkPath() + "logs") .withInstances(new JobFlowInstancesConfig() .withEc2SubnetId("subnetid") .withInstanceCount(2) // 1 for task manager + 1 for job manager .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); ---------------------------------------------------------------------------------------------------------------- And this is how I add the jobs: --------------------------------------------------------------------------------- StepConfig runJobStep = new StepConfig() .withName("run-job-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "flink run -m yarn-cluster" + " --parallelism " + parallelism + " --class " + jobClass.getCanonicalName() + " /home/hadoop/flink-jobs.jar " + jobArguments)); AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() .withJobFlowId(clusterId) .withSteps(runJobStep); AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request); --------------------------------------------------------------------------------- And these are my jobs: - Job1 - parallelism 1 - Job2 - parallelism 1 - Job3 - parallelism 2 I'm using m4.large machines as slave so I have 2 cores in it, and I was expecting that Job1 and Job2 were running in parallel and then Job3 when Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending status) for Job1 to finish before start. I see only one task manager is created for Job1, when finishes another one is created for Job2, and then 2 are created for Job3 Since I have 2 cores available why is it not running Job2 in the other instead of wait? is there any way to configure it? Thanks