Hi Ryan,

Reposting the code.

Basically my use case is something like - I am receiving the web impression
logs and may get the notify (listening from kafka) for those impressions in
the same interval (for me its 1 min) or any next interval (upto 2 hours).
Now, when I receive notify for a particular impression I need to swap the
date field in impression with the date field in notify logs. The notify for
an impression has the same key as impression.

static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
State<MyClass> state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1 /*this is impression*/){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
/*notify for the impression received*/){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
 //swappping the dates
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


return
myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();


Currently I am using reducebykeyandwindow without the inverse function and
I am able to get the correct data. But, issue the might arise is when I
have to restart my application from checkpoint and it repartitions and
computes the previous 120 partitions, which delays the incoming batches.


Thanks !!
Abhi

On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>> State<MyClass> state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com>
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>> abhis.anan...@gmail.com> wrote:
>>>>>
>>>>>> Does mapWithState checkpoints the data ?
>>>>>>
>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>
>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there 
>>>>>> any
>>>>>> other work around ?
>>>>>>
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>>
>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>> sebastian....@gmail.com> wrote:
>>>>>>
>>>>>>> Looks like mapWithState could help you?
>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have an use case like follows in my production environment where
>>>>>>>> I am listening from kafka with slideInterval of 1 min and windowLength 
>>>>>>>> of 2
>>>>>>>> hours.
>>>>>>>>
>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>> key but with different value,which might appear in the same batch or 
>>>>>>>> some
>>>>>>>> next batch.
>>>>>>>>
>>>>>>>> When the key appears second time I need to update a field in value
>>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>
>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>> database.
>>>>>>>>
>>>>>>>> For example :
>>>>>>>>
>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>> At t=1sec I am getting
>>>>>>>> key0,value0(0,"prev0")
>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>
>>>>>>>> Output to database after 1 sec
>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>
>>>>>>>> At t=2 sec getting
>>>>>>>> key3,value4(4,"prev3")
>>>>>>>> key1,value5(5,"next1")
>>>>>>>>
>>>>>>>> Output to database after 2 sec
>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>
>>>>>>>> At t=3 sec
>>>>>>>> key4,value6(6,"prev4")
>>>>>>>> key3,value7(7,"next3")
>>>>>>>> key5,value5(8,"prev5")
>>>>>>>> key5,value5(9,"next5")
>>>>>>>> key0,value0(10,"next0")
>>>>>>>>
>>>>>>>> Output to database after 3 sec
>>>>>>>> key0,newValue(0,"next0")
>>>>>>>> key3,newValue(4,"next3")
>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>
>>>>>>>>
>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks a lot !!!!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Reply via email to