Sorry that I forgot to tell you that you should also call `rdd.count()` for "reduceByKey" as well. Could you try it and see if it works?
On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > Hi Ryan, > > I am using mapWithState after doing reduceByKey. > > I am right now using mapWithState as you suggested and triggering the > count manually. > > But, still unable to see any checkpointing taking place. In the DAG I can > see that the reduceByKey operation for the previous batches are also being > computed. > > > Thanks > Abhi > > > On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Hey Abhi, >> >> Using reducebykeyandwindow and mapWithState will trigger the bug >> in SPARK-6847. Here is a workaround to trigger checkpoint manually: >> >> JavaMapWithStateDStream<...> stateDStream = >> myPairDstream.mapWithState(StateSpec.function(mappingFunc)); >> stateDStream.foreachRDD(new Function1<...>() { >> @Override >> public Void call(JavaRDD<...> rdd) throws Exception { >> rdd.count(); >> } >> }); >> return stateDStream.stateSnapshots(); >> >> >> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com >> > wrote: >> >>> Hi Ryan, >>> >>> Reposting the code. >>> >>> Basically my use case is something like - I am receiving the web >>> impression logs and may get the notify (listening from kafka) for those >>> impressions in the same interval (for me its 1 min) or any next interval >>> (upto 2 hours). Now, when I receive notify for a particular impression I >>> need to swap the date field in impression with the date field in notify >>> logs. The notify for an impression has the same key as impression. >>> >>> static Function3<String, Optional<MyClass>, State<MyClass>, >>> Tuple2<String, MyClass>> mappingFunc = >>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >>> MyClass>>() { >>> @Override >>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>> State<MyClass> state) { >>> MyClass nullObj = new MyClass(); >>> nullObj.setImprLog(null); >>> nullObj.setNotifyLog(null); >>> MyClass current = one.or(nullObj); >>> >>> if(current!= null && current.getImprLog() != null && >>> current.getMyClassType() == 1 /*this is impression*/){ >>> return new Tuple2<>(key, null); >>> } >>> else if (current.getNotifyLog() != null && current.getMyClassType() == >>> 3 /*notify for the impression received*/){ >>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>> if(oldState!= null && oldState.getNotifyLog() != null){ >>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>> //swappping the dates >>> return new Tuple2<>(key, oldState); >>> } >>> else{ >>> return new Tuple2<>(key, null); >>> } >>> } >>> else{ >>> return new Tuple2<>(key, null); >>> } >>> >>> } >>> }; >>> >>> >>> return >>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots(); >>> >>> >>> Currently I am using reducebykeyandwindow without the inverse function >>> and I am able to get the correct data. But, issue the might arise is when I >>> have to restart my application from checkpoint and it repartitions and >>> computes the previous 120 partitions, which delays the incoming batches. >>> >>> >>> Thanks !! >>> Abhi >>> >>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Hey Abhi, >>>> >>>> Could you post how you use mapWithState? By default, it should do >>>> checkpointing every 10 batches. >>>> However, there is a known issue that prevents mapWithState from >>>> checkpointing in some special cases: >>>> https://issues.apache.org/jira/browse/SPARK-6847 >>>> >>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand < >>>> abhis.anan...@gmail.com> wrote: >>>> >>>>> Any Insights on this one ? >>>>> >>>>> >>>>> Thanks !!! >>>>> Abhi >>>>> >>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand < >>>>> abhis.anan...@gmail.com> wrote: >>>>> >>>>>> I am now trying to use mapWithState in the following way using some >>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint >>>>>> the state and when restarting the application from checkpoint, it >>>>>> re-partitions all the previous batches data from kafka. >>>>>> >>>>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>>>> Tuple2<String, MyClass>> mappingFunc = >>>>>> new Function3<String, Optional<MyClass>, State<MyClass>, >>>>>> Tuple2<String, MyClass>>() { >>>>>> @Override >>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> >>>>>> one, State<MyClass> state) { >>>>>> MyClass nullObj = new MyClass(); >>>>>> nullObj.setImprLog(null); >>>>>> nullObj.setNotifyLog(null); >>>>>> MyClass current = one.or(nullObj); >>>>>> >>>>>> if(current!= null && current.getImprLog() != null && >>>>>> current.getMyClassType() == 1){ >>>>>> return new Tuple2<>(key, null); >>>>>> } >>>>>> else if (current.getNotifyLog() != null && current.getMyClassType() >>>>>> == 3){ >>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>>>> return new Tuple2<>(key, oldState); >>>>>> } >>>>>> else{ >>>>>> return new Tuple2<>(key, null); >>>>>> } >>>>>> } >>>>>> else{ >>>>>> return new Tuple2<>(key, null); >>>>>> } >>>>>> >>>>>> } >>>>>> }; >>>>>> >>>>>> >>>>>> Please suggest if this is the proper way or am I doing something >>>>>> wrong. >>>>>> >>>>>> >>>>>> Thanks !! >>>>>> Abhi >>>>>> >>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu < >>>>>> sebastian....@gmail.com> wrote: >>>>>> >>>>>>> If you don't want to update your only option will be >>>>>>> updateStateByKey then >>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>>> mapWithState supports checkpoint. >>>>>>>> >>>>>>>> There has been some bug fix since release of 1.6.0 >>>>>>>> e.g. >>>>>>>> SPARK-12591 NullPointerException using checkpointed mapWithState >>>>>>>> with KryoSerializer >>>>>>>> >>>>>>>> which is in the upcoming 1.6.1 >>>>>>>> >>>>>>>> Cheers >>>>>>>> >>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Does mapWithState checkpoints the data ? >>>>>>>>> >>>>>>>>> When my application goes down and is restarted from checkpoint, >>>>>>>>> will mapWithState need to recompute the previous batches data ? >>>>>>>>> >>>>>>>>> Also, to use mapWithState I will need to upgrade my application as >>>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is >>>>>>>>> there >>>>>>>>> any other work around ? >>>>>>>>> >>>>>>>>> Cheers!! >>>>>>>>> Abhi >>>>>>>>> >>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>>>>> sebastian....@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Looks like mapWithState could help you? >>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" < >>>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> >>>>>>>>>>> I have an use case like follows in my production environment >>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and >>>>>>>>>>> windowLength of 2 hours. >>>>>>>>>>> >>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the >>>>>>>>>>> same key but with different value,which might appear in the same >>>>>>>>>>> batch or >>>>>>>>>>> some next batch. >>>>>>>>>>> >>>>>>>>>>> When the key appears second time I need to update a field in >>>>>>>>>>> value of previous key with a field in the later key. The keys for >>>>>>>>>>> which the >>>>>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>>>>> >>>>>>>>>>> At the end of each second I need to output the result to >>>>>>>>>>> external database. >>>>>>>>>>> >>>>>>>>>>> For example : >>>>>>>>>>> >>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>>>>> At t=1sec I am getting >>>>>>>>>>> key0,value0(0,"prev0") >>>>>>>>>>> key1,value1 (1, "prev1") >>>>>>>>>>> key2,value2 (2,"prev2") >>>>>>>>>>> key2,value3 (3, "next2") >>>>>>>>>>> >>>>>>>>>>> Output to database after 1 sec >>>>>>>>>>> key2, newValue (2,"next2") >>>>>>>>>>> >>>>>>>>>>> At t=2 sec getting >>>>>>>>>>> key3,value4(4,"prev3") >>>>>>>>>>> key1,value5(5,"next1") >>>>>>>>>>> >>>>>>>>>>> Output to database after 2 sec >>>>>>>>>>> key1,newValue(1,"next1") >>>>>>>>>>> >>>>>>>>>>> At t=3 sec >>>>>>>>>>> key4,value6(6,"prev4") >>>>>>>>>>> key3,value7(7,"next3") >>>>>>>>>>> key5,value5(8,"prev5") >>>>>>>>>>> key5,value5(9,"next5") >>>>>>>>>>> key0,value0(10,"next0") >>>>>>>>>>> >>>>>>>>>>> Output to database after 3 sec >>>>>>>>>>> key0,newValue(0,"next0") >>>>>>>>>>> key3,newValue(4,"next3") >>>>>>>>>>> key5,newValue(8,"next5") >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Please suggest how this can be achieved. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks a lot !!!! >>>>>>>>>>> Abhi >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> >