Hi Fabian, 1) I have looked at the linked docs, and from what I can tell no setup should really need to be done to get Flink working(Other than downloading the correct binaries, which I believe I did) 2) I have downloaded the Flink 1.3.2 binaries(flink-1.3.2-bin- hadoop27-scala_2.11.tgz <http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz>) This is for hadoop 2.7.X, which matches EMR 5.8.0.
I appreciate any help or guidance you can provide me in fixing my problems, please let me know if there is anything else I can provide you. Thank you On Mon, Oct 2, 2017 at 4:12 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Andy, > > I'm not an AWS expert, so I'll just check on some common issues. > > I guess you already had a look at the Flink docs for AWS/EMR but I'll post > the link just be to sure [1]. > > Since you are using Flink 1.3.2 (EMR 5.8.0 comes with Flink 1.3.1) did you > built Flink yourself or did you download the binaries? > Does the Hadoop version of the Flink build match the Hadoop version of EMR > 5.8.0, i.e., Hadoop 2.7.x? > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html > > 2017-10-02 21:51 GMT+02:00 Andy M. <ajm2...@gmail.com>: > > > Hi Fabian, > > > > Sorry, I just realized I forgot to include that part. The error returned > > is: > > > > java.lang.NoSuchMethodError: > > org.apache.hadoop.conf.Configuration.addResource( > Lorg/apache/hadoop/conf/ > > Configuration;)V > > at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize( > > EmrFileSystem.java:93) > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem. > > initialize(HadoopFileSystem.java:328) > > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > > FileSystem.java:350) > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) > > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) > > at org.apache.flink.runtime.state.filesystem. > > FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) > > at org.apache.flink.runtime.state.filesystem.FsStateBackend. > > createStreamFactory(FsStateBackend.java:282) > > at org.apache.flink.contrib.streaming.state.RocksDBStateBackend. > > createStreamFactory(RocksDBStateBackend.java:273 > > > > I believe it has something to do with the classpath, but I am unsure why > or > > how to fix it. The classpath being used during the execution is: > > /home/hadoop/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/ > > home/hadoop/flink-1.3.2/lib/flink-shaded-hadoop2- > > uber-1.3.2.jar:/home/hadoop/flink-1.3.2/lib/log4j-1.2. > > 17.jar:/home/hadoop/flink-1.3.2/lib/slf4j-log4j12-1.7.7. > > jar:/home/hadoop/flink-1.3.2/lib/flink-dist_2.11-1.3. > > 2.jar::/etc/hadoop/conf: > > > > I decompiled flink-shaded-hadoop2-uber-1.3.2.jar and it seems the > > addResource function does seem to be there. > > > > Thank you > > > > On Mon, Oct 2, 2017 at 3:43 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > > > Hi Andy, > > > > > > can you describe in more detail what exactly isn't working? > > > Do you see error messages in the log files or on the console? > > > > > > Thanks, Fabian > > > > > > 2017-10-02 15:52 GMT+02:00 Andy M. <ajm2...@gmail.com>: > > > > > > > Hello, > > > > > > > > I am about to deploy my first Flink projects to production, but I am > > > > running into a very big hurdle. I am unable to launch my project so > it > > > can > > > > write to an S3 bucket. My project is running on an EMR cluster, > where > > I > > > > have installed Flink 1.3.2. I am using Yarn to launch the > application, > > > and > > > > it seems to run fine unless I am trying to enable check > pointing(with a > > > S3 > > > > target). I am looking to use RocksDB as my check-pointing backend. > I > > > have > > > > asked a few places, and I am still unable to find a solution to this > > > > problem. Here are my steps for creating a cluster, and launching my > > > > application, perhaps I am missing a step. I'd be happy to provide > any > > > > additional information if needed. > > > > > > > > AWS Portal: > > > > > > > > 1) EMR -> Create Cluster > > > > 2) Advanced Options > > > > 3) Release = emr-5.8.0 > > > > 4) Only select Hadoop 2.7.3 > > > > 5) Next -> Next -> Next -> Create Cluster ( I do fill out > > > > names/keys/etc) > > > > > > > > Once the cluster is up I ssh into the Master and do the following: > > > > > > > > 1 wget > > > > http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin- > > > > hadoop27-scala_2.11.tgz > > > > 2 tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz > > > > 3 cd flink-1.3.2 > > > > 4 ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d > > > > 5 Change conf/flink-conf.yaml > > > > 6 ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar > > > > > > > > My conf/flink-conf.yaml I add the following fields: > > > > > > > > state.backend: rocksdb > > > > state.backend.fs.checkpointdir: s3:/bucket/location > > > > state.checkpoints.dir: s3:/bucket/location > > > > > > > > My program's checkpointing setup: > > > > > > > > > > > > env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ > > > ONCE) > > > > > > > > env.getCheckpointConfig.enableExternalizedCheckpoints( > > > > ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > > > > > > > > env.getCheckpointConfig.setMinPauseBetweenCheckpoints( > > > > getCheckpointMinPause) > > > > env.getCheckpointConfig.setCheckpointTimeout( > getCheckpointTimeout) > > > > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > > > > env.setStateBackend(new RocksDBStateBackend("s3:// > > bucket/location", > > > > true)) > > > > > > > > > >