Hello

I'm running Flink over Amazon EMR and I'm trying to send several different
batch jobs to the cluster after creating it.

This is my cluster creation code:
----------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
"/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
     .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "2048m")
    .addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("subnetid")
        .withInstanceCount(2) // 1 for task manager + 1 for job manager
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
----------------------------------------------------------------------------------------------------------------

And this is how I add the jobs:
---------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
    .withArgs("bash", "-c", "flink run -m yarn-cluster"
        + " --parallelism " + parallelism
        + " --class " + jobClass.getCanonicalName()
        + " /home/hadoop/flink-jobs.jar "
        + jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(runJobStep);

AddJobFlowStepsResult result =
amazonClient.getEmrClient().addJobFlowSteps(request);
---------------------------------------------------------------------------------

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was
expecting that Job1 and Job2 were running in parallel and then Job3 when
Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
status) for Job1 to finish before start. I see only one task manager is
created for Job1, when finishes another one is created for Job2, and then 2
are created for Job3

Since I have 2 cores available why is it not running Job2 in the other
instead of wait? is there any way to configure it?

Thanks

Reply via email to