Hello,
I am about to deploy my first Flink projects to production, but I am
running into a very big hurdle. I am unable to launch my project so it can
write to an S3 bucket. My project is running on an EMR cluster, where I
have installed Flink 1.3.2. I am using Yarn to launch the application, and
it seems to run fine unless I am trying to enable check pointing(with a S3
target). I am looking to use RocksDB as my check-pointing backend. I have
asked a few places, and I am still unable to find a solution to this
problem. Here are my steps for creating a cluster, and launching my
application, perhaps I am missing a step. I'd be happy to provide any
additional information if needed.
AWS Portal:
1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out
names/keys/etc)
Once the cluster is up I ssh into the Master and do the following:
1 wget
http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2 tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3 cd flink-1.3.2
4 ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5 Change conf/flink-conf.yaml
6 ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar
My conf/flink-conf.yaml I add the following fields:
state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location
My program's checkpointing setup:
env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location",
true))