Yep, that was the problem.

Thanks!

On Mon, Apr 18, 2016 at 11:36 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> could it be that your state is very small? In that case the state is not
> actually stored in HDFS but on the job manager because writing it to HDFS
> and storing a handle to that in the JobManager would be more expensive.
>
> Cheers,
> Aljoscha
>
> On Mon, 18 Apr 2016 at 17:20 Jason Brelloch <jb.bc....@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I am trying to set up flink with a hdfs state backend.  I configured
>> state.backend and state.backend.fs.checkpointdir parameters in the
>> flink-conf.yaml.  I run the flink task and the checkpoint directories are
>> created in hdfs, so it appears it can connect and talk to hdfs just fine.
>> Unfortunately no files are ever created in the hdfs directory.  I checked
>> that the state is being saved and restored from the task manager memory and
>> that works fine, it just never writes to hdfs.
>>
>> Am I missing a step?  Do I need to do anything to force a write to hdfs?
>> Does the state variable have to be a particular type to work with hdfs?
>>
>> This is what my snapshot functions look like:
>>
>>   override def restoreState (rState: 
>> scala.collection.mutable.HashMap[String, String]): Unit = {    state = 
>> rState  }  override def snapshotState(checkpointId: Long, 
>> checkpointTimestamp: Long): scala.collection.mutable.HashMap[String, String] 
>> = {    state  }
>>
>>
>> Thanks!
>> -Jason
>>
>> P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11
>>
>

Reply via email to