@Aljoscha: For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?
@Stefano: I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class: inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } }) > .keyBy({s => s}) > .map(new StatefulCounter) class StatefulCounter extends RichMapFunction[String, (String,Int)] with > Checkpointed[Integer] { > private var count: Integer = 0 > > def map(in: String): (String,Int) = { > count += 1 > return (in, count) > } > def snapshotState(l: Long, l1: Long): Integer = { > count > } > def restoreState(state: Integer) { > count = state > } > } Thanks, Jack Huang On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino < stefano.bagh...@radicalbit.io> wrote: > My bad, thanks for pointing that out. > > On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> the *withState() family of functions use the Key/Value state interface >> internally, so that should work. >> >> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino < >> stefano.bagh...@radicalbit.io> wrote: >> >>> Hi Jack, >>> >>> it seems you correctly enabled the checkpointing by calling >>> `env.enableCheckpointing`. However, your UDFs have to either implement the >>> Checkpointed interface or use the Key/Value State interface to make sure >>> the state of the computation is snapshotted. >>> >>> The documentation explains how to define your functions so that they >>> checkpoint the state far better than I could in this post: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html >>> >>> I hope I've been of some help, I'll gladly help you further if you need >>> it. >>> >>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> what seems to be the problem? >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am doing a simple word count example and want to checkpoint the >>>>> accumulated word counts. I am not having any luck getting the counts saved >>>>> and restored. Can someone help? >>>>> >>>>> env.enableCheckpointing(1000) >>>>> >>>>> env.setStateBackend(new MemoryStateBackend()) >>>>> >>>>> >>>>>> ... >>>>> >>>>> >>>>> >>>>> inStream >>>>>> .keyBy({s => s}) >>>>>> >>>>>> >>>>>> >>>>>> *.mapWithState((in:String, count:Option[Int]) => { val >>>>>> newCount = count.getOrElse(0) + 1 ((in, newCount), Some(newCount)) >>>>>> })* >>>>>> .print() >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Jack Huang >>>>> >>>> >>> >>> >>> -- >>> BR, >>> Stefano Baghino >>> >>> Software Engineer @ Radicalbit >>> >> > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit >