Hey Abhi, Using reducebykeyandwindow and mapWithState will trigger the bug in SPARK-6847. Here is a workaround to trigger checkpoint manually:
JavaMapWithStateDStream<...> stateDStream = myPairDstream.mapWithState(StateSpec.function(mappingFunc)); stateDStream.foreachRDD(new Function1<...>() { @Override public Void call(JavaRDD<...> rdd) throws Exception { rdd.count(); } }); return stateDStream.stateSnapshots(); On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > Hi Ryan, > > Reposting the code. > > Basically my use case is something like - I am receiving the web > impression logs and may get the notify (listening from kafka) for those > impressions in the same interval (for me its 1 min) or any next interval > (upto 2 hours). Now, when I receive notify for a particular impression I > need to swap the date field in impression with the date field in notify > logs. The notify for an impression has the same key as impression. > > static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, > MyClass>> mappingFunc = > new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, > MyClass>>() { > @Override > public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, > State<MyClass> state) { > MyClass nullObj = new MyClass(); > nullObj.setImprLog(null); > nullObj.setNotifyLog(null); > MyClass current = one.or(nullObj); > > if(current!= null && current.getImprLog() != null && > current.getMyClassType() == 1 /*this is impression*/){ > return new Tuple2<>(key, null); > } > else if (current.getNotifyLog() != null && current.getMyClassType() == 3 > /*notify for the impression received*/){ > MyClass oldState = (state.exists() ? state.get() : nullObj); > if(oldState!= null && oldState.getNotifyLog() != null){ > oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); > //swappping the dates > return new Tuple2<>(key, oldState); > } > else{ > return new Tuple2<>(key, null); > } > } > else{ > return new Tuple2<>(key, null); > } > > } > }; > > > return > myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots(); > > > Currently I am using reducebykeyandwindow without the inverse function and > I am able to get the correct data. But, issue the might arise is when I > have to restart my application from checkpoint and it repartitions and > computes the previous 120 partitions, which delays the incoming batches. > > > Thanks !! > Abhi > > On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Hey Abhi, >> >> Could you post how you use mapWithState? By default, it should do >> checkpointing every 10 batches. >> However, there is a known issue that prevents mapWithState from >> checkpointing in some special cases: >> https://issues.apache.org/jira/browse/SPARK-6847 >> >> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com> >> wrote: >> >>> Any Insights on this one ? >>> >>> >>> Thanks !!! >>> Abhi >>> >>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand < >>> abhis.anan...@gmail.com> wrote: >>> >>>> I am now trying to use mapWithState in the following way using some >>>> example codes. But, by looking at the DAG it does not seem to checkpoint >>>> the state and when restarting the application from checkpoint, it >>>> re-partitions all the previous batches data from kafka. >>>> >>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>> Tuple2<String, MyClass>> mappingFunc = >>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >>>> MyClass>>() { >>>> @Override >>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>>> State<MyClass> state) { >>>> MyClass nullObj = new MyClass(); >>>> nullObj.setImprLog(null); >>>> nullObj.setNotifyLog(null); >>>> MyClass current = one.or(nullObj); >>>> >>>> if(current!= null && current.getImprLog() != null && >>>> current.getMyClassType() == 1){ >>>> return new Tuple2<>(key, null); >>>> } >>>> else if (current.getNotifyLog() != null && current.getMyClassType() == >>>> 3){ >>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>> return new Tuple2<>(key, oldState); >>>> } >>>> else{ >>>> return new Tuple2<>(key, null); >>>> } >>>> } >>>> else{ >>>> return new Tuple2<>(key, null); >>>> } >>>> >>>> } >>>> }; >>>> >>>> >>>> Please suggest if this is the proper way or am I doing something wrong. >>>> >>>> >>>> Thanks !! >>>> Abhi >>>> >>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com >>>> > wrote: >>>> >>>>> If you don't want to update your only option will be updateStateByKey >>>>> then >>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>> >>>>>> mapWithState supports checkpoint. >>>>>> >>>>>> There has been some bug fix since release of 1.6.0 >>>>>> e.g. >>>>>> SPARK-12591 NullPointerException using checkpointed mapWithState >>>>>> with KryoSerializer >>>>>> >>>>>> which is in the upcoming 1.6.1 >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>>> abhis.anan...@gmail.com> wrote: >>>>>> >>>>>>> Does mapWithState checkpoints the data ? >>>>>>> >>>>>>> When my application goes down and is restarted from checkpoint, will >>>>>>> mapWithState need to recompute the previous batches data ? >>>>>>> >>>>>>> Also, to use mapWithState I will need to upgrade my application as I >>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there >>>>>>> any >>>>>>> other work around ? >>>>>>> >>>>>>> Cheers!! >>>>>>> Abhi >>>>>>> >>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>>> sebastian....@gmail.com> wrote: >>>>>>> >>>>>>>> Looks like mapWithState could help you? >>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I have an use case like follows in my production environment where >>>>>>>>> I am listening from kafka with slideInterval of 1 min and >>>>>>>>> windowLength of 2 >>>>>>>>> hours. >>>>>>>>> >>>>>>>>> I have a JavaPairDStream where for each key I am getting the same >>>>>>>>> key but with different value,which might appear in the same batch or >>>>>>>>> some >>>>>>>>> next batch. >>>>>>>>> >>>>>>>>> When the key appears second time I need to update a field in value >>>>>>>>> of previous key with a field in the later key. The keys for which the >>>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>>> >>>>>>>>> At the end of each second I need to output the result to external >>>>>>>>> database. >>>>>>>>> >>>>>>>>> For example : >>>>>>>>> >>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>>> At t=1sec I am getting >>>>>>>>> key0,value0(0,"prev0") >>>>>>>>> key1,value1 (1, "prev1") >>>>>>>>> key2,value2 (2,"prev2") >>>>>>>>> key2,value3 (3, "next2") >>>>>>>>> >>>>>>>>> Output to database after 1 sec >>>>>>>>> key2, newValue (2,"next2") >>>>>>>>> >>>>>>>>> At t=2 sec getting >>>>>>>>> key3,value4(4,"prev3") >>>>>>>>> key1,value5(5,"next1") >>>>>>>>> >>>>>>>>> Output to database after 2 sec >>>>>>>>> key1,newValue(1,"next1") >>>>>>>>> >>>>>>>>> At t=3 sec >>>>>>>>> key4,value6(6,"prev4") >>>>>>>>> key3,value7(7,"next3") >>>>>>>>> key5,value5(8,"prev5") >>>>>>>>> key5,value5(9,"next5") >>>>>>>>> key0,value0(10,"next0") >>>>>>>>> >>>>>>>>> Output to database after 3 sec >>>>>>>>> key0,newValue(0,"next0") >>>>>>>>> key3,newValue(4,"next3") >>>>>>>>> key5,newValue(8,"next5") >>>>>>>>> >>>>>>>>> >>>>>>>>> Please suggest how this can be achieved. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks a lot !!!! >>>>>>>>> Abhi >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >