Hi,
could it be that your state is very small? In that case the state is not
actually stored in HDFS but on the job manager because writing it to HDFS
and storing a handle to that in the JobManager would be more expensive.

Cheers,
Aljoscha

On Mon, 18 Apr 2016 at 17:20 Jason Brelloch <jb.bc....@gmail.com> wrote:

> Hi everyone,
>
> I am trying to set up flink with a hdfs state backend.  I configured
> state.backend and state.backend.fs.checkpointdir parameters in the
> flink-conf.yaml.  I run the flink task and the checkpoint directories are
> created in hdfs, so it appears it can connect and talk to hdfs just fine.
> Unfortunately no files are ever created in the hdfs directory.  I checked
> that the state is being saved and restored from the task manager memory and
> that works fine, it just never writes to hdfs.
>
> Am I missing a step?  Do I need to do anything to force a write to hdfs?
> Does the state variable have to be a particular type to work with hdfs?
>
> This is what my snapshot functions look like:
>
>   override def restoreState (rState: scala.collection.mutable.HashMap[String, 
> String]): Unit = {    state = rState  }  override def 
> snapshotState(checkpointId: Long, checkpointTimestamp: Long): 
> scala.collection.mutable.HashMap[String, String] = {    state  }
>
>
> Thanks!
> -Jason
>
> P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11
>

Reply via email to