Hey, Ted, As the fix for SPARK-6847 changes the semantics of Streaming checkpointing, it doesn't go into branch 1.6.
A workaround is calling `count` to trigger the checkpoint manually. Such as, val dstream = ... // dstream is an operator needing to be checkpointed. dstream.foreachRDD(rdd => rdd.count()) On Mon, Feb 22, 2016 at 12:25 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Fix for SPARK-6847 is not in branch-1.6 > > Should the fix be ported to branch-1.6 ? > > Thanks > > On Feb 22, 2016, at 11:55 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com> > wrote: > > Hey Abhi, > > Could you post how you use mapWithState? By default, it should do > checkpointing every 10 batches. > However, there is a known issue that prevents mapWithState from > checkpointing in some special cases: > https://issues.apache.org/jira/browse/SPARK-6847 > > On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com> > wrote: > >> Any Insights on this one ? >> >> >> Thanks !!! >> Abhi >> >> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com >> > wrote: >> >>> I am now trying to use mapWithState in the following way using some >>> example codes. But, by looking at the DAG it does not seem to checkpoint >>> the state and when restarting the application from checkpoint, it >>> re-partitions all the previous batches data from kafka. >>> >>> static Function3<String, Optional<MyClass>, State<MyClass>, >>> Tuple2<String, MyClass>> mappingFunc = >>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >>> MyClass>>() { >>> @Override >>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>> State<MyClass> state) { >>> MyClass nullObj = new MyClass(); >>> nullObj.setImprLog(null); >>> nullObj.setNotifyLog(null); >>> MyClass current = one.or(nullObj); >>> >>> if(current!= null && current.getImprLog() != null && >>> current.getMyClassType() == 1){ >>> return new Tuple2<>(key, null); >>> } >>> else if (current.getNotifyLog() != null && current.getMyClassType() == >>> 3){ >>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>> if(oldState!= null && oldState.getNotifyLog() != null){ >>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>> return new Tuple2<>(key, oldState); >>> } >>> else{ >>> return new Tuple2<>(key, null); >>> } >>> } >>> else{ >>> return new Tuple2<>(key, null); >>> } >>> >>> } >>> }; >>> >>> >>> Please suggest if this is the proper way or am I doing something wrong. >>> >>> >>> Thanks !! >>> Abhi >>> >>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com> >>> wrote: >>> >>>> If you don't want to update your only option will be updateStateByKey >>>> then >>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>> >>>>> mapWithState supports checkpoint. >>>>> >>>>> There has been some bug fix since release of 1.6.0 >>>>> e.g. >>>>> SPARK-12591 NullPointerException using checkpointed mapWithState >>>>> with KryoSerializer >>>>> >>>>> which is in the upcoming 1.6.1 >>>>> >>>>> Cheers >>>>> >>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>> abhis.anan...@gmail.com> wrote: >>>>> >>>>>> Does mapWithState checkpoints the data ? >>>>>> >>>>>> When my application goes down and is restarted from checkpoint, will >>>>>> mapWithState need to recompute the previous batches data ? >>>>>> >>>>>> Also, to use mapWithState I will need to upgrade my application as I >>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there >>>>>> any >>>>>> other work around ? >>>>>> >>>>>> Cheers!! >>>>>> Abhi >>>>>> >>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>> sebastian....@gmail.com> wrote: >>>>>> >>>>>>> Looks like mapWithState could help you? >>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I have an use case like follows in my production environment where >>>>>>>> I am listening from kafka with slideInterval of 1 min and windowLength >>>>>>>> of 2 >>>>>>>> hours. >>>>>>>> >>>>>>>> I have a JavaPairDStream where for each key I am getting the same >>>>>>>> key but with different value,which might appear in the same batch or >>>>>>>> some >>>>>>>> next batch. >>>>>>>> >>>>>>>> When the key appears second time I need to update a field in value >>>>>>>> of previous key with a field in the later key. The keys for which the >>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>> >>>>>>>> At the end of each second I need to output the result to external >>>>>>>> database. >>>>>>>> >>>>>>>> For example : >>>>>>>> >>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>> At t=1sec I am getting >>>>>>>> key0,value0(0,"prev0") >>>>>>>> key1,value1 (1, "prev1") >>>>>>>> key2,value2 (2,"prev2") >>>>>>>> key2,value3 (3, "next2") >>>>>>>> >>>>>>>> Output to database after 1 sec >>>>>>>> key2, newValue (2,"next2") >>>>>>>> >>>>>>>> At t=2 sec getting >>>>>>>> key3,value4(4,"prev3") >>>>>>>> key1,value5(5,"next1") >>>>>>>> >>>>>>>> Output to database after 2 sec >>>>>>>> key1,newValue(1,"next1") >>>>>>>> >>>>>>>> At t=3 sec >>>>>>>> key4,value6(6,"prev4") >>>>>>>> key3,value7(7,"next3") >>>>>>>> key5,value5(8,"prev5") >>>>>>>> key5,value5(9,"next5") >>>>>>>> key0,value0(10,"next0") >>>>>>>> >>>>>>>> Output to database after 3 sec >>>>>>>> key0,newValue(0,"next0") >>>>>>>> key3,newValue(4,"next3") >>>>>>>> key5,newValue(8,"next5") >>>>>>>> >>>>>>>> >>>>>>>> Please suggest how this can be achieved. >>>>>>>> >>>>>>>> >>>>>>>> Thanks a lot !!!! >>>>>>>> Abhi >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> >