Yep, that was the problem. Thanks!
On Mon, Apr 18, 2016 at 11:36 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > could it be that your state is very small? In that case the state is not > actually stored in HDFS but on the job manager because writing it to HDFS > and storing a handle to that in the JobManager would be more expensive. > > Cheers, > Aljoscha > > On Mon, 18 Apr 2016 at 17:20 Jason Brelloch <jb.bc....@gmail.com> wrote: > >> Hi everyone, >> >> I am trying to set up flink with a hdfs state backend. I configured >> state.backend and state.backend.fs.checkpointdir parameters in the >> flink-conf.yaml. I run the flink task and the checkpoint directories are >> created in hdfs, so it appears it can connect and talk to hdfs just fine. >> Unfortunately no files are ever created in the hdfs directory. I checked >> that the state is being saved and restored from the task manager memory and >> that works fine, it just never writes to hdfs. >> >> Am I missing a step? Do I need to do anything to force a write to hdfs? >> Does the state variable have to be a particular type to work with hdfs? >> >> This is what my snapshot functions look like: >> >> override def restoreState (rState: >> scala.collection.mutable.HashMap[String, String]): Unit = { state = >> rState } override def snapshotState(checkpointId: Long, >> checkpointTimestamp: Long): scala.collection.mutable.HashMap[String, String] >> = { state } >> >> >> Thanks! >> -Jason >> >> P.S. I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11 >> >