Any Insights on this one ?

Thanks !!!
Abhi

On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> I am now trying to use mapWithState in the following way using some
> example codes. But, by looking at the DAG it does not seem to checkpoint
> the state and when restarting the application from checkpoint, it
> re-partitions all the previous batches data from kafka.
>
> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
> State<MyClass> state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> Please suggest if this is the proper way or am I doing something wrong.
>
>
> Thanks !!
> Abhi
>
> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com>
> wrote:
>
>> If you don't want to update your only option will be updateStateByKey then
>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> mapWithState supports checkpoint.
>>>
>>> There has been some bug fix since release of 1.6.0
>>> e.g.
>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>> KryoSerializer
>>>
>>> which is in the upcoming 1.6.1
>>>
>>> Cheers
>>>
>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Does mapWithState checkpoints the data ?
>>>>
>>>> When my application goes down and is restarted from checkpoint, will
>>>> mapWithState need to recompute the previous batches data ?
>>>>
>>>> Also, to use mapWithState I will need to upgrade my application as I am
>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>> other work around ?
>>>>
>>>> Cheers!!
>>>> Abhi
>>>>
>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian....@gmail.com
>>>> > wrote:
>>>>
>>>>> Looks like mapWithState could help you?
>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have an use case like follows in my production environment where I
>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>> hours.
>>>>>>
>>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>>> but with different value,which might appear in the same batch or some 
>>>>>> next
>>>>>> batch.
>>>>>>
>>>>>> When the key appears second time I need to update a field in value of
>>>>>> previous key with a field in the later key. The keys for which the
>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>
>>>>>> At the end of each second I need to output the result to external
>>>>>> database.
>>>>>>
>>>>>> For example :
>>>>>>
>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>> At t=1sec I am getting
>>>>>> key0,value0(0,"prev0")
>>>>>> key1,value1 (1, "prev1")
>>>>>> key2,value2 (2,"prev2")
>>>>>> key2,value3 (3, "next2")
>>>>>>
>>>>>> Output to database after 1 sec
>>>>>> key2, newValue (2,"next2")
>>>>>>
>>>>>> At t=2 sec getting
>>>>>> key3,value4(4,"prev3")
>>>>>> key1,value5(5,"next1")
>>>>>>
>>>>>> Output to database after 2 sec
>>>>>> key1,newValue(1,"next1")
>>>>>>
>>>>>> At t=3 sec
>>>>>> key4,value6(6,"prev4")
>>>>>> key3,value7(7,"next3")
>>>>>> key5,value5(8,"prev5")
>>>>>> key5,value5(9,"next5")
>>>>>> key0,value0(10,"next0")
>>>>>>
>>>>>> Output to database after 3 sec
>>>>>> key0,newValue(0,"next0")
>>>>>> key3,newValue(4,"next3")
>>>>>> key5,newValue(8,"next5")
>>>>>>
>>>>>>
>>>>>> Please suggest how this can be achieved.
>>>>>>
>>>>>>
>>>>>> Thanks a lot !!!!
>>>>>> Abhi
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>

Reply via email to