I reply to myself with the solution in case someone else is having the same
question

It is only needed to add a copy command to copy the jar from flink/opt to
flink/lib, in my case:

StepConfig addGellyStep = new StepConfig()
        .withName("add-gelly-step")
        .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
                .withArgs("bash", "-c", "sudo cp
/usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));



On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hello,
>
> I'm trying to run a flink job that works with graphs in a transient
> cluster in EMR, here is my code:
>
> ----------
>     HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
>             .withJar("command-runner.jar")
>             .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME +
> "/pugore-flink.jar /home/hadoop/pugore-flink.jar");
>
>     StepConfig copyJarStep = new StepConfig()
>             .withName("Copy Jar")
>             .withHadoopJarStep(copyJarStepConf);
>
>     HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
>             .withJar("command-runner.jar")
>             .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
>                     + " --class
> es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
>                     + " --alpha 0.05"
>                     + " --iterations 50"
>                     + " --input s3://" + BUCKET_NAME + "/" + key +
> "/edges.csv"
>                     + " --output s3://" + BUCKET_NAME + "/" + key +
> "/vertices-centrality.csv");
>
>     StepConfig flinkRunJobStep = new StepConfig()
>             .withName("Flink job")
>             .withActionOnFailure("CONTINUE")
>             .withHadoopJarStep(flinkJobConf);
>
>     List<StepConfig> stepConfigs = new ArrayList<>();
>     stepConfigs.add(copyJarStep);
>     stepConfigs.add(flinkRunJobStep);
>
>     Application flink = new Application().withName("Flink");
>
>     String clusterName = "flink-job-" + key;
>     RunJobFlowRequest request = new RunJobFlowRequest()
>             .withName(clusterName)
>             .withReleaseLabel("emr-5.26.0")
>             .withApplications(flink)
>             .withServiceRole("EMR_DefaultRole")
>             .withJobFlowRole("EMR_EC2_DefaultRole")
>             .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
>             .withInstances(new JobFlowInstancesConfig()
>                     .withInstanceCount(2)
>                     .withKeepJobFlowAliveWhenNoSteps(false)
>                     .withMasterInstanceType("m4.large")
>                     .withSlaveInstanceType("m4.large"))
>             .withSteps(stepConfigs);
>
>     RunJobFlowResult result = getEmrClient().runJobFlow(request);
>     String clusterId = result.getJobFlowId();
>
>     log.debug("[" + key + "] cluster created with id: " + clusterId);
> -------------------------
>
> This job creates the cluster from scratch and launches my job, it is
> executed but I'm getting the following error:
>
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/graph/GraphAlgorithm
>
> In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib
> and it works, is there any way to do it automatically in a transient EMR
> cluster before launching the job?
>
> I know I can put the jar in S3 and copy it from there as I do with my jar
> in the first step and then use it as classpath, but I'm wondering if it is
> possible to instruct EMR to include that dependency in some way, maybe with
> some option in Application, Configuration,  BootstrapAction or any other...
> since it is a Flink dependency
>
> Thank you
>
>
>
>
>

Reply via email to