Hello again,

thanks for giving a shot at my advice anyway but Aljoscha is far more
knowledgeable then me regarding Flink. :)

I hope I'm not getting mixed up again but I think gracefully canceling your
job means you lose your job state. Am I right in saying that the state is
preserved in case of abnormal termination (e.g.: the JobManager crashes) or
if you explicitly create a savepoint?

On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com>
wrote:

> @Aljoscha:
> For this word count example I am using a kafka topic as the input stream.
> The problem is that when I cancel the task and restart it, the task loses
> the accumulated word counts so far and start counting from 1 again. Am I
> missing something basic here?
>
> @Stefano:
> I also tried to implements the Checkpointed interface but had no luck
> either. Canceling and restarting the task did not restore the states. Here
> is my class:
>
> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>   .keyBy({s => s})
>>   .map(new StatefulCounter)
>
>
> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>> Checkpointed[Integer] {
>>   private var count: Integer = 0
>>
>>   def map(in: String): (String,Int) = {
>>     count += 1
>>     return (in, count)
>>   }
>>   def snapshotState(l: Long, l1: Long): Integer = {
>>     count
>>   }
>>   def restoreState(state: Integer) {
>>     count = state
>>   }
>> }
>
>
>
> Thanks,
>
>
> Jack Huang
>
> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> My bad, thanks for pointing that out.
>>
>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> the *withState() family of functions use the Key/Value state interface
>>> internally, so that should work.
>>>
>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>> stefano.bagh...@radicalbit.io> wrote:
>>>
>>>> Hi Jack,
>>>>
>>>> it seems you correctly enabled the checkpointing by calling
>>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>>> Checkpointed interface or use the Key/Value State interface to make sure
>>>> the state of the computation is snapshotted.
>>>>
>>>> The documentation explains how to define your functions so that they
>>>> checkpoint the state far better than I could in this post:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>>
>>>> I hope I've been of some help, I'll gladly help you further if you need
>>>> it.
>>>>
>>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>> what seems to be the problem?
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I am doing a simple word count example and want to checkpoint the
>>>>>> accumulated word counts. I am not having any luck getting the counts 
>>>>>> saved
>>>>>> and restored. Can someone help?
>>>>>>
>>>>>> env.enableCheckpointing(1000)
>>>>>>
>>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>>
>>>>>>
>>>>>>>  ...
>>>>>>
>>>>>>
>>>>>>
>>>>>> inStream
>>>>>>>     .keyBy({s => s})
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *.mapWithState((in:String, count:Option[Int]) => {        val
>>>>>>> newCount = count.getOrElse(0) + 1        ((in, newCount), 
>>>>>>> Some(newCount))
>>>>>>>   })*
>>>>>>>     .print()
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jack Huang
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> BR,
>>>> Stefano Baghino
>>>>
>>>> Software Engineer @ Radicalbit
>>>>
>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to