Hello again, thanks for giving a shot at my advice anyway but Aljoscha is far more knowledgeable then me regarding Flink. :)
I hope I'm not getting mixed up again but I think gracefully canceling your job means you lose your job state. Am I right in saying that the state is preserved in case of abnormal termination (e.g.: the JobManager crashes) or if you explicitly create a savepoint? On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com> wrote: > @Aljoscha: > For this word count example I am using a kafka topic as the input stream. > The problem is that when I cancel the task and restart it, the task loses > the accumulated word counts so far and start counting from 1 again. Am I > missing something basic here? > > @Stefano: > I also tried to implements the Checkpointed interface but had no luck > either. Canceling and restarting the task did not restore the states. Here > is my class: > > inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } }) >> .keyBy({s => s}) >> .map(new StatefulCounter) > > > class StatefulCounter extends RichMapFunction[String, (String,Int)] with >> Checkpointed[Integer] { >> private var count: Integer = 0 >> >> def map(in: String): (String,Int) = { >> count += 1 >> return (in, count) >> } >> def snapshotState(l: Long, l1: Long): Integer = { >> count >> } >> def restoreState(state: Integer) { >> count = state >> } >> } > > > > Thanks, > > > Jack Huang > > On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino < > stefano.bagh...@radicalbit.io> wrote: > >> My bad, thanks for pointing that out. >> >> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> the *withState() family of functions use the Key/Value state interface >>> internally, so that should work. >>> >>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino < >>> stefano.bagh...@radicalbit.io> wrote: >>> >>>> Hi Jack, >>>> >>>> it seems you correctly enabled the checkpointing by calling >>>> `env.enableCheckpointing`. However, your UDFs have to either implement the >>>> Checkpointed interface or use the Key/Value State interface to make sure >>>> the state of the computation is snapshotted. >>>> >>>> The documentation explains how to define your functions so that they >>>> checkpoint the state far better than I could in this post: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html >>>> >>>> I hope I've been of some help, I'll gladly help you further if you need >>>> it. >>>> >>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org >>>> > wrote: >>>> >>>>> Hi, >>>>> what seems to be the problem? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am doing a simple word count example and want to checkpoint the >>>>>> accumulated word counts. I am not having any luck getting the counts >>>>>> saved >>>>>> and restored. Can someone help? >>>>>> >>>>>> env.enableCheckpointing(1000) >>>>>> >>>>>> env.setStateBackend(new MemoryStateBackend()) >>>>>> >>>>>> >>>>>>> ... >>>>>> >>>>>> >>>>>> >>>>>> inStream >>>>>>> .keyBy({s => s}) >>>>>>> >>>>>>> >>>>>>> >>>>>>> *.mapWithState((in:String, count:Option[Int]) => { val >>>>>>> newCount = count.getOrElse(0) + 1 ((in, newCount), >>>>>>> Some(newCount)) >>>>>>> })* >>>>>>> .print() >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jack Huang >>>>>> >>>>> >>>> >>>> >>>> -- >>>> BR, >>>> Stefano Baghino >>>> >>>> Software Engineer @ Radicalbit >>>> >>> >> >> >> -- >> BR, >> Stefano Baghino >> >> Software Engineer @ Radicalbit >> > > -- BR, Stefano Baghino Software Engineer @ Radicalbit