Hi Jack, it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.
The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html I hope I've been of some help, I'll gladly help you further if you need it. On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > what seems to be the problem? > > Cheers, > Aljoscha > > On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com> wrote: > >> Hi all, >> >> I am doing a simple word count example and want to checkpoint the >> accumulated word counts. I am not having any luck getting the counts saved >> and restored. Can someone help? >> >> env.enableCheckpointing(1000) >> >> env.setStateBackend(new MemoryStateBackend()) >> >> >>> ... >> >> >> >> inStream >>> .keyBy({s => s}) >>> >>> >>> >>> *.mapWithState((in:String, count:Option[Int]) => { val newCount = >>> count.getOrElse(0) + 1 ((in, newCount), Some(newCount)) })* >>> .print() >> >> >> >> Thanks, >> >> Jack Huang >> > -- BR, Stefano Baghino Software Engineer @ Radicalbit