Hey Abhi,

Using reducebykeyandwindow and mapWithState will trigger the bug
in SPARK-6847. Here is a workaround to trigger checkpoint manually:

    JavaMapWithStateDStream<...> stateDStream =
myPairDstream.mapWithState(StateSpec.function(mappingFunc));
    stateDStream.foreachRDD(new Function1<...>() {
      @Override
      public Void call(JavaRDD<...> rdd) throws Exception {
        rdd.count();
      }
    });
    return stateDStream.stateSnapshots();


On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Ryan,
>
> Reposting the code.
>
> Basically my use case is something like - I am receiving the web
> impression logs and may get the notify (listening from kafka) for those
> impressions in the same interval (for me its 1 min) or any next interval
> (upto 2 hours). Now, when I receive notify for a particular impression I
> need to swap the date field in impression with the date field in notify
> logs. The notify for an impression has the same key as impression.
>
> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
> State<MyClass> state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1 /*this is impression*/){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
> /*notify for the impression received*/){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>  //swappping the dates
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> return
> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>
>
> Currently I am using reducebykeyandwindow without the inverse function and
> I am able to get the correct data. But, issue the might arise is when I
> have to restart my application from checkpoint and it repartitions and
> computes the previous 120 partitions, which delays the incoming batches.
>
>
> Thanks !!
> Abhi
>
> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Could you post how you use mapWithState? By default, it should do
>> checkpointing every 10 batches.
>> However, there is a known issue that prevents mapWithState from
>> checkpointing in some special cases:
>> https://issues.apache.org/jira/browse/SPARK-6847
>>
>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
>> wrote:
>>
>>> Any Insights on this one ?
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> I am now trying to use mapWithState in the following way using some
>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>> the state and when restarting the application from checkpoint, it
>>>> re-partitions all the previous batches data from kafka.
>>>>
>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>> Tuple2<String, MyClass>> mappingFunc =
>>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>> State<MyClass> state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> Please suggest if this is the proper way or am I doing something wrong.
>>>>
>>>>
>>>> Thanks !!
>>>> Abhi
>>>>
>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com
>>>> > wrote:
>>>>
>>>>> If you don't want to update your only option will be updateStateByKey
>>>>> then
>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> mapWithState supports checkpoint.
>>>>>>
>>>>>> There has been some bug fix since release of 1.6.0
>>>>>> e.g.
>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>> with KryoSerializer
>>>>>>
>>>>>> which is in the upcoming 1.6.1
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>> abhis.anan...@gmail.com> wrote:
>>>>>>
>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>
>>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>>
>>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there 
>>>>>>> any
>>>>>>> other work around ?
>>>>>>>
>>>>>>> Cheers!!
>>>>>>> Abhi
>>>>>>>
>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>> sebastian....@gmail.com> wrote:
>>>>>>>
>>>>>>>> Looks like mapWithState could help you?
>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have an use case like follows in my production environment where
>>>>>>>>> I am listening from kafka with slideInterval of 1 min and 
>>>>>>>>> windowLength of 2
>>>>>>>>> hours.
>>>>>>>>>
>>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>>> key but with different value,which might appear in the same batch or 
>>>>>>>>> some
>>>>>>>>> next batch.
>>>>>>>>>
>>>>>>>>> When the key appears second time I need to update a field in value
>>>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>
>>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>>> database.
>>>>>>>>>
>>>>>>>>> For example :
>>>>>>>>>
>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>> At t=1sec I am getting
>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>
>>>>>>>>> Output to database after 1 sec
>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>
>>>>>>>>> At t=2 sec getting
>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>
>>>>>>>>> Output to database after 2 sec
>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>
>>>>>>>>> At t=3 sec
>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>
>>>>>>>>> Output to database after 3 sec
>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks a lot !!!!
>>>>>>>>> Abhi
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to