@Aljoscha:
For this word count example I am using a kafka topic as the input stream.
The problem is that when I cancel the task and restart it, the task loses
the accumulated word counts so far and start counting from 1 again. Am I
missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck
either. Canceling and restarting the task did not restore the states. Here
is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>   .keyBy({s => s})
>   .map(new StatefulCounter)


class StatefulCounter extends RichMapFunction[String, (String,Int)] with
> Checkpointed[Integer] {
>   private var count: Integer = 0
>
>   def map(in: String): (String,Int) = {
>     count += 1
>     return (in, count)
>   }
>   def snapshotState(l: Long, l1: Long): Integer = {
>     count
>   }
>   def restoreState(state: Integer) {
>     count = state
>   }
> }



Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> My bad, thanks for pointing that out.
>
> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> the *withState() family of functions use the Key/Value state interface
>> internally, so that should work.
>>
>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> Hi Jack,
>>>
>>> it seems you correctly enabled the checkpointing by calling
>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>> Checkpointed interface or use the Key/Value State interface to make sure
>>> the state of the computation is snapshotted.
>>>
>>> The documentation explains how to define your functions so that they
>>> checkpoint the state far better than I could in this post:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>
>>> I hope I've been of some help, I'll gladly help you further if you need
>>> it.
>>>
>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> what seems to be the problem?
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am doing a simple word count example and want to checkpoint the
>>>>> accumulated word counts. I am not having any luck getting the counts saved
>>>>> and restored. Can someone help?
>>>>>
>>>>> env.enableCheckpointing(1000)
>>>>>
>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>
>>>>>
>>>>>>  ...
>>>>>
>>>>>
>>>>>
>>>>> inStream
>>>>>>     .keyBy({s => s})
>>>>>>
>>>>>>
>>>>>>
>>>>>> *.mapWithState((in:String, count:Option[Int]) => {        val
>>>>>> newCount = count.getOrElse(0) + 1        ((in, newCount), Some(newCount))
>>>>>>   })*
>>>>>>     .print()
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jack Huang
>>>>>
>>>>
>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Reply via email to