Any Insights on this one ?
Thanks !!! Abhi On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > I am now trying to use mapWithState in the following way using some > example codes. But, by looking at the DAG it does not seem to checkpoint > the state and when restarting the application from checkpoint, it > re-partitions all the previous batches data from kafka. > > static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, > MyClass>> mappingFunc = > new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, > MyClass>>() { > @Override > public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, > State<MyClass> state) { > MyClass nullObj = new MyClass(); > nullObj.setImprLog(null); > nullObj.setNotifyLog(null); > MyClass current = one.or(nullObj); > > if(current!= null && current.getImprLog() != null && > current.getMyClassType() == 1){ > return new Tuple2<>(key, null); > } > else if (current.getNotifyLog() != null && current.getMyClassType() == 3){ > MyClass oldState = (state.exists() ? state.get() : nullObj); > if(oldState!= null && oldState.getNotifyLog() != null){ > oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); > return new Tuple2<>(key, oldState); > } > else{ > return new Tuple2<>(key, null); > } > } > else{ > return new Tuple2<>(key, null); > } > > } > }; > > > Please suggest if this is the proper way or am I doing something wrong. > > > Thanks !! > Abhi > > On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com> > wrote: > >> If you don't want to update your only option will be updateStateByKey then >> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >> >>> mapWithState supports checkpoint. >>> >>> There has been some bug fix since release of 1.6.0 >>> e.g. >>> SPARK-12591 NullPointerException using checkpointed mapWithState with >>> KryoSerializer >>> >>> which is in the upcoming 1.6.1 >>> >>> Cheers >>> >>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>> abhis.anan...@gmail.com> wrote: >>> >>>> Does mapWithState checkpoints the data ? >>>> >>>> When my application goes down and is restarted from checkpoint, will >>>> mapWithState need to recompute the previous batches data ? >>>> >>>> Also, to use mapWithState I will need to upgrade my application as I am >>>> using version 1.4.0 and mapWithState isnt supported there. Is there any >>>> other work around ? >>>> >>>> Cheers!! >>>> Abhi >>>> >>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian....@gmail.com >>>> > wrote: >>>> >>>>> Looks like mapWithState could help you? >>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have an use case like follows in my production environment where I >>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2 >>>>>> hours. >>>>>> >>>>>> I have a JavaPairDStream where for each key I am getting the same key >>>>>> but with different value,which might appear in the same batch or some >>>>>> next >>>>>> batch. >>>>>> >>>>>> When the key appears second time I need to update a field in value of >>>>>> previous key with a field in the later key. The keys for which the >>>>>> combination keys do not come should be rejected after 2 hours. >>>>>> >>>>>> At the end of each second I need to output the result to external >>>>>> database. >>>>>> >>>>>> For example : >>>>>> >>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>> At t=1sec I am getting >>>>>> key0,value0(0,"prev0") >>>>>> key1,value1 (1, "prev1") >>>>>> key2,value2 (2,"prev2") >>>>>> key2,value3 (3, "next2") >>>>>> >>>>>> Output to database after 1 sec >>>>>> key2, newValue (2,"next2") >>>>>> >>>>>> At t=2 sec getting >>>>>> key3,value4(4,"prev3") >>>>>> key1,value5(5,"next1") >>>>>> >>>>>> Output to database after 2 sec >>>>>> key1,newValue(1,"next1") >>>>>> >>>>>> At t=3 sec >>>>>> key4,value6(6,"prev4") >>>>>> key3,value7(7,"next3") >>>>>> key5,value5(8,"prev5") >>>>>> key5,value5(9,"next5") >>>>>> key0,value0(10,"next0") >>>>>> >>>>>> Output to database after 3 sec >>>>>> key0,newValue(0,"next0") >>>>>> key3,newValue(4,"next3") >>>>>> key5,newValue(8,"next5") >>>>>> >>>>>> >>>>>> Please suggest how this can be achieved. >>>>>> >>>>>> >>>>>> Thanks a lot !!!! >>>>>> Abhi >>>>>> >>>>>> >>>>>> >>>> >>> >