Hi Ryan, Its not working even after removing the reduceByKey.
So, basically I am doing the following - reading from kafka - flatmap inside transform - mapWithState - rdd.count on output of mapWithState But to my surprise still dont see checkpointing taking place. Is there any restriction to the type of operation that we can perform inside mapWithState ? Really need to resolve this one as currently if my application is restarted from checkpoint it has to repartition 120 previous stages which takes hell lot of time. Thanks !! Abhi On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote: > Sorry that I forgot to tell you that you should also call `rdd.count()` > for "reduceByKey" as well. Could you try it and see if it works? > > On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com> > wrote: > >> Hi Ryan, >> >> I am using mapWithState after doing reduceByKey. >> >> I am right now using mapWithState as you suggested and triggering the >> count manually. >> >> But, still unable to see any checkpointing taking place. In the DAG I can >> see that the reduceByKey operation for the previous batches are also being >> computed. >> >> >> Thanks >> Abhi >> >> >> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Hey Abhi, >>> >>> Using reducebykeyandwindow and mapWithState will trigger the bug >>> in SPARK-6847. Here is a workaround to trigger checkpoint manually: >>> >>> JavaMapWithStateDStream<...> stateDStream = >>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)); >>> stateDStream.foreachRDD(new Function1<...>() { >>> @Override >>> public Void call(JavaRDD<...> rdd) throws Exception { >>> rdd.count(); >>> } >>> }); >>> return stateDStream.stateSnapshots(); >>> >>> >>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand < >>> abhis.anan...@gmail.com> wrote: >>> >>>> Hi Ryan, >>>> >>>> Reposting the code. >>>> >>>> Basically my use case is something like - I am receiving the web >>>> impression logs and may get the notify (listening from kafka) for those >>>> impressions in the same interval (for me its 1 min) or any next interval >>>> (upto 2 hours). Now, when I receive notify for a particular impression I >>>> need to swap the date field in impression with the date field in notify >>>> logs. The notify for an impression has the same key as impression. >>>> >>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>> Tuple2<String, MyClass>> mappingFunc = >>>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >>>> MyClass>>() { >>>> @Override >>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>>> State<MyClass> state) { >>>> MyClass nullObj = new MyClass(); >>>> nullObj.setImprLog(null); >>>> nullObj.setNotifyLog(null); >>>> MyClass current = one.or(nullObj); >>>> >>>> if(current!= null && current.getImprLog() != null && >>>> current.getMyClassType() == 1 /*this is impression*/){ >>>> return new Tuple2<>(key, null); >>>> } >>>> else if (current.getNotifyLog() != null && current.getMyClassType() == >>>> 3 /*notify for the impression received*/){ >>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>> //swappping the dates >>>> return new Tuple2<>(key, oldState); >>>> } >>>> else{ >>>> return new Tuple2<>(key, null); >>>> } >>>> } >>>> else{ >>>> return new Tuple2<>(key, null); >>>> } >>>> >>>> } >>>> }; >>>> >>>> >>>> return >>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots(); >>>> >>>> >>>> Currently I am using reducebykeyandwindow without the inverse function >>>> and I am able to get the correct data. But, issue the might arise is when I >>>> have to restart my application from checkpoint and it repartitions and >>>> computes the previous 120 partitions, which delays the incoming batches. >>>> >>>> >>>> Thanks !! >>>> Abhi >>>> >>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu < >>>> shixi...@databricks.com> wrote: >>>> >>>>> Hey Abhi, >>>>> >>>>> Could you post how you use mapWithState? By default, it should do >>>>> checkpointing every 10 batches. >>>>> However, there is a known issue that prevents mapWithState from >>>>> checkpointing in some special cases: >>>>> https://issues.apache.org/jira/browse/SPARK-6847 >>>>> >>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand < >>>>> abhis.anan...@gmail.com> wrote: >>>>> >>>>>> Any Insights on this one ? >>>>>> >>>>>> >>>>>> Thanks !!! >>>>>> Abhi >>>>>> >>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand < >>>>>> abhis.anan...@gmail.com> wrote: >>>>>> >>>>>>> I am now trying to use mapWithState in the following way using some >>>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint >>>>>>> the state and when restarting the application from checkpoint, it >>>>>>> re-partitions all the previous batches data from kafka. >>>>>>> >>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>>>>> Tuple2<String, MyClass>> mappingFunc = >>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>, >>>>>>> Tuple2<String, MyClass>>() { >>>>>>> @Override >>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> >>>>>>> one, State<MyClass> state) { >>>>>>> MyClass nullObj = new MyClass(); >>>>>>> nullObj.setImprLog(null); >>>>>>> nullObj.setNotifyLog(null); >>>>>>> MyClass current = one.or(nullObj); >>>>>>> >>>>>>> if(current!= null && current.getImprLog() != null && >>>>>>> current.getMyClassType() == 1){ >>>>>>> return new Tuple2<>(key, null); >>>>>>> } >>>>>>> else if (current.getNotifyLog() != null && current.getMyClassType() >>>>>>> == 3){ >>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>>>>> return new Tuple2<>(key, oldState); >>>>>>> } >>>>>>> else{ >>>>>>> return new Tuple2<>(key, null); >>>>>>> } >>>>>>> } >>>>>>> else{ >>>>>>> return new Tuple2<>(key, null); >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> }; >>>>>>> >>>>>>> >>>>>>> Please suggest if this is the proper way or am I doing something >>>>>>> wrong. >>>>>>> >>>>>>> >>>>>>> Thanks !! >>>>>>> Abhi >>>>>>> >>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu < >>>>>>> sebastian....@gmail.com> wrote: >>>>>>> >>>>>>>> If you don't want to update your only option will be >>>>>>>> updateStateByKey then >>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>>>>> >>>>>>>>> mapWithState supports checkpoint. >>>>>>>>> >>>>>>>>> There has been some bug fix since release of 1.6.0 >>>>>>>>> e.g. >>>>>>>>> SPARK-12591 NullPointerException using checkpointed mapWithState >>>>>>>>> with KryoSerializer >>>>>>>>> >>>>>>>>> which is in the upcoming 1.6.1 >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Does mapWithState checkpoints the data ? >>>>>>>>>> >>>>>>>>>> When my application goes down and is restarted from checkpoint, >>>>>>>>>> will mapWithState need to recompute the previous batches data ? >>>>>>>>>> >>>>>>>>>> Also, to use mapWithState I will need to upgrade my application >>>>>>>>>> as I am using version 1.4.0 and mapWithState isnt supported there. >>>>>>>>>> Is there >>>>>>>>>> any other work around ? >>>>>>>>>> >>>>>>>>>> Cheers!! >>>>>>>>>> Abhi >>>>>>>>>> >>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>>>>>> sebastian....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Looks like mapWithState could help you? >>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" < >>>>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi All, >>>>>>>>>>>> >>>>>>>>>>>> I have an use case like follows in my production environment >>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and >>>>>>>>>>>> windowLength of 2 hours. >>>>>>>>>>>> >>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the >>>>>>>>>>>> same key but with different value,which might appear in the same >>>>>>>>>>>> batch or >>>>>>>>>>>> some next batch. >>>>>>>>>>>> >>>>>>>>>>>> When the key appears second time I need to update a field in >>>>>>>>>>>> value of previous key with a field in the later key. The keys for >>>>>>>>>>>> which the >>>>>>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>>>>>> >>>>>>>>>>>> At the end of each second I need to output the result to >>>>>>>>>>>> external database. >>>>>>>>>>>> >>>>>>>>>>>> For example : >>>>>>>>>>>> >>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>>>>>> At t=1sec I am getting >>>>>>>>>>>> key0,value0(0,"prev0") >>>>>>>>>>>> key1,value1 (1, "prev1") >>>>>>>>>>>> key2,value2 (2,"prev2") >>>>>>>>>>>> key2,value3 (3, "next2") >>>>>>>>>>>> >>>>>>>>>>>> Output to database after 1 sec >>>>>>>>>>>> key2, newValue (2,"next2") >>>>>>>>>>>> >>>>>>>>>>>> At t=2 sec getting >>>>>>>>>>>> key3,value4(4,"prev3") >>>>>>>>>>>> key1,value5(5,"next1") >>>>>>>>>>>> >>>>>>>>>>>> Output to database after 2 sec >>>>>>>>>>>> key1,newValue(1,"next1") >>>>>>>>>>>> >>>>>>>>>>>> At t=3 sec >>>>>>>>>>>> key4,value6(6,"prev4") >>>>>>>>>>>> key3,value7(7,"next3") >>>>>>>>>>>> key5,value5(8,"prev5") >>>>>>>>>>>> key5,value5(9,"next5") >>>>>>>>>>>> key0,value0(10,"next0") >>>>>>>>>>>> >>>>>>>>>>>> Output to database after 3 sec >>>>>>>>>>>> key0,newValue(0,"next0") >>>>>>>>>>>> key3,newValue(4,"next3") >>>>>>>>>>>> key5,newValue(8,"next5") >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Please suggest how this can be achieved. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot !!!! >>>>>>>>>>>> Abhi >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >