Sorry that I forgot to tell you that you should also call `rdd.count()` for
"reduceByKey" as well. Could you try it and see if it works?

On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Ryan,
>
> I am using mapWithState after doing reduceByKey.
>
> I am right now using mapWithState as you suggested and triggering the
> count manually.
>
> But, still unable to see any checkpointing taking place. In the DAG I can
> see that the reduceByKey operation for the previous batches are also being
> computed.
>
>
> Thanks
> Abhi
>
>
> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Using reducebykeyandwindow and mapWithState will trigger the bug
>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>
>>     JavaMapWithStateDStream<...> stateDStream =
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>     stateDStream.foreachRDD(new Function1<...>() {
>>       @Override
>>       public Void call(JavaRDD<...> rdd) throws Exception {
>>         rdd.count();
>>       }
>>     });
>>     return stateDStream.stateSnapshots();
>>
>>
>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> Hi Ryan,
>>>
>>> Reposting the code.
>>>
>>> Basically my use case is something like - I am receiving the web
>>> impression logs and may get the notify (listening from kafka) for those
>>> impressions in the same interval (for me its 1 min) or any next interval
>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>> need to swap the date field in impression with the date field in notify
>>> logs. The notify for an impression has the same key as impression.
>>>
>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>> State<MyClass> state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1 /*this is impression*/){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3 /*notify for the impression received*/){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>  //swappping the dates
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> return
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>
>>>
>>> Currently I am using reducebykeyandwindow without the inverse function
>>> and I am able to get the correct data. But, issue the might arise is when I
>>> have to restart my application from checkpoint and it repartitions and
>>> computes the previous 120 partitions, which delays the incoming batches.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Could you post how you use mapWithState? By default, it should do
>>>> checkpointing every 10 batches.
>>>> However, there is a known issue that prevents mapWithState from
>>>> checkpointing in some special cases:
>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>
>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> Any Insights on this one ?
>>>>>
>>>>>
>>>>> Thanks !!!
>>>>> Abhi
>>>>>
>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>>> abhis.anan...@gmail.com> wrote:
>>>>>
>>>>>> I am now trying to use mapWithState in the following way using some
>>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>>> the state and when restarting the application from checkpoint, it
>>>>>> re-partitions all the previous batches data from kafka.
>>>>>>
>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>> Tuple2<String, MyClass>>() {
>>>>>> @Override
>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
>>>>>> one, State<MyClass> state) {
>>>>>> MyClass nullObj = new MyClass();
>>>>>> nullObj.setImprLog(null);
>>>>>> nullObj.setNotifyLog(null);
>>>>>> MyClass current = one.or(nullObj);
>>>>>>
>>>>>> if(current!= null && current.getImprLog() != null &&
>>>>>> current.getMyClassType() == 1){
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>>> == 3){
>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>> return new Tuple2<>(key, oldState);
>>>>>> }
>>>>>> else{
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>> }
>>>>>> else{
>>>>>> return new Tuple2<>(key, null);
>>>>>> }
>>>>>>
>>>>>> }
>>>>>> };
>>>>>>
>>>>>>
>>>>>> Please suggest if this is the proper way or am I doing something
>>>>>> wrong.
>>>>>>
>>>>>>
>>>>>> Thanks !!
>>>>>> Abhi
>>>>>>
>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>>> sebastian....@gmail.com> wrote:
>>>>>>
>>>>>>> If you don't want to update your only option will be
>>>>>>> updateStateByKey then
>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>>> mapWithState supports checkpoint.
>>>>>>>>
>>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>>> e.g.
>>>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>>>> with KryoSerializer
>>>>>>>>
>>>>>>>> which is in the upcoming 1.6.1
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>>> abhis.anan...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>>
>>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>>
>>>>>>>>> Also, to use mapWithState I will need to upgrade my application as
>>>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is 
>>>>>>>>> there
>>>>>>>>> any other work around ?
>>>>>>>>>
>>>>>>>>> Cheers!!
>>>>>>>>> Abhi
>>>>>>>>>
>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>>> sebastian....@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>>> abhis.anan...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>>
>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the
>>>>>>>>>>> same key but with different value,which might appear in the same 
>>>>>>>>>>> batch or
>>>>>>>>>>> some next batch.
>>>>>>>>>>>
>>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>>> value of previous key with a field in the later key. The keys for 
>>>>>>>>>>> which the
>>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>>
>>>>>>>>>>> At the end of each second I need to output the result to
>>>>>>>>>>> external database.
>>>>>>>>>>>
>>>>>>>>>>> For example :
>>>>>>>>>>>
>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>>
>>>>>>>>>>> At t=2 sec getting
>>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>>
>>>>>>>>>>> At t=3 sec
>>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>>
>>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>>> Abhi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to