I could not make it work as I wanted with taskmanager.numberOfTaskSlots to
2, but I found a way for running them in parallel, just creating a cluster
for each job since they are independent

Thanks

On Mon, Mar 30, 2020 at 4:22 PM Gary Yao <g...@apache.org> wrote:

> Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
> default the TMs only offer one slot [1] independent from the number of CPU
> cores.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199
>
> On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
> amarti...@alto-analytics.com> wrote:
>
>> Hello
>>
>> I'm running Flink over Amazon EMR and I'm trying to send several
>> different batch jobs to the cluster after creating it.
>>
>> This is my cluster creation code:
>> ----------------------------------------------------------------
>> StepConfig copyJarStep = new StepConfig()
>>     .withName("copy-jar-step")
>>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
>> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>>
>> List<StepConfig> stepConfigs = new ArrayList<>();
>> stepConfigs.add(copyJarStep);
>>
>> Application flink = new Application().withName("Flink");
>>
>> Configuration flinkConfiguration = new Configuration()
>>      .withClassification("flink-conf")
>>     .addPropertiesEntry("jobmanager.heap.size", "2048m")
>>     .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>>
>> RunJobFlowRequest request = new RunJobFlowRequest()
>>     .withName("cluster-" + executionKey)
>>     .withReleaseLabel("emr-5.26.0")
>>     .withApplications(flink)
>>     .withConfigurations(flinkConfiguration)
>>     .withServiceRole("EMR_DefaultRole")
>>     .withJobFlowRole("EMR_EC2_DefaultRole")
>>     .withLogUri(getWorkPath() + "logs")
>>     .withInstances(new JobFlowInstancesConfig()
>>         .withEc2SubnetId("subnetid")
>>         .withInstanceCount(2) // 1 for task manager + 1 for job manager
>>         .withKeepJobFlowAliveWhenNoSteps(true)
>>         .withMasterInstanceType("m4.large")
>>         .withSlaveInstanceType("m4.large"))
>>     .withSteps(stepConfigs);
>>
>> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>>
>> ----------------------------------------------------------------------------------------------------------------
>>
>> And this is how I add the jobs:
>>
>> ---------------------------------------------------------------------------------
>> StepConfig runJobStep = new StepConfig()
>>     .withName("run-job-step")
>>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>>     .withArgs("bash", "-c", "flink run -m yarn-cluster"
>>         + " --parallelism " + parallelism
>>         + " --class " + jobClass.getCanonicalName()
>>         + " /home/hadoop/flink-jobs.jar "
>>         + jobArguments));
>>
>> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
>>     .withJobFlowId(clusterId)
>>     .withSteps(runJobStep);
>>
>> AddJobFlowStepsResult result =
>> amazonClient.getEmrClient().addJobFlowSteps(request);
>>
>> ---------------------------------------------------------------------------------
>>
>> And these are my jobs:
>>
>> - Job1 - parallelism 1
>> - Job2 - parallelism 1
>> - Job3 - parallelism 2
>>
>> I'm using m4.large machines as slave so I have 2 cores in it, and I was
>> expecting that Job1 and Job2 were running in parallel and then Job3 when
>> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
>> status) for Job1 to finish before start. I see only one task manager is
>> created for Job1, when finishes another one is created for Job2, and then 2
>> are created for Job3
>>
>> Since I have 2 cores available why is it not running Job2 in the other
>> instead of wait? is there any way to configure it?
>>
>> Thanks
>>
>>
>>
>>

Reply via email to