I reply to myself with the solution in case someone else is having the same question
It is only needed to add a copy command to copy the jar from flink/opt to flink/lib, in my case: StepConfig addGellyStep = new StepConfig() .withName("add-gelly-step") .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "sudo cp /usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib")); On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá < amarti...@alto-analytics.com> wrote: > Hello, > > I'm trying to run a flink job that works with graphs in a transient > cluster in EMR, here is my code: > > ---------- > HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig() > .withJar("command-runner.jar") > .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME + > "/pugore-flink.jar /home/hadoop/pugore-flink.jar"); > > StepConfig copyJarStep = new StepConfig() > .withName("Copy Jar") > .withHadoopJarStep(copyJarStepConf); > > HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig() > .withJar("command-runner.jar") > .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1" > + " --class > es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar" > + " --alpha 0.05" > + " --iterations 50" > + " --input s3://" + BUCKET_NAME + "/" + key + > "/edges.csv" > + " --output s3://" + BUCKET_NAME + "/" + key + > "/vertices-centrality.csv"); > > StepConfig flinkRunJobStep = new StepConfig() > .withName("Flink job") > .withActionOnFailure("CONTINUE") > .withHadoopJarStep(flinkJobConf); > > List<StepConfig> stepConfigs = new ArrayList<>(); > stepConfigs.add(copyJarStep); > stepConfigs.add(flinkRunJobStep); > > Application flink = new Application().withName("Flink"); > > String clusterName = "flink-job-" + key; > RunJobFlowRequest request = new RunJobFlowRequest() > .withName(clusterName) > .withReleaseLabel("emr-5.26.0") > .withApplications(flink) > .withServiceRole("EMR_DefaultRole") > .withJobFlowRole("EMR_EC2_DefaultRole") > .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs") > .withInstances(new JobFlowInstancesConfig() > .withInstanceCount(2) > .withKeepJobFlowAliveWhenNoSteps(false) > .withMasterInstanceType("m4.large") > .withSlaveInstanceType("m4.large")) > .withSteps(stepConfigs); > > RunJobFlowResult result = getEmrClient().runJobFlow(request); > String clusterId = result.getJobFlowId(); > > log.debug("[" + key + "] cluster created with id: " + clusterId); > ------------------------- > > This job creates the cluster from scratch and launches my job, it is > executed but I'm getting the following error: > > Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/graph/GraphAlgorithm > > In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib > and it works, is there any way to do it automatically in a transient EMR > cluster before launching the job? > > I know I can put the jar in S3 and copy it from there as I do with my jar > in the first step and then use it as classpath, but I'm wondering if it is > possible to instruct EMR to include that dependency in some way, maybe with > some option in Application, Configuration, BootstrapAction or any other... > since it is a Flink dependency > > Thank you > > > > >