Could you post the screenshot of the Streaming DAG and also the driver log? It would be great if you have a simple producer for us to debug.
On Mon, Feb 29, 2016 at 1:39 AM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > Hi Ryan, > > Its not working even after removing the reduceByKey. > > So, basically I am doing the following > - reading from kafka > - flatmap inside transform > - mapWithState > - rdd.count on output of mapWithState > > But to my surprise still dont see checkpointing taking place. > > Is there any restriction to the type of operation that we can perform > inside mapWithState ? > > Really need to resolve this one as currently if my application is > restarted from checkpoint it has to repartition 120 previous stages which > takes hell lot of time. > > Thanks !! > Abhi > > On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Sorry that I forgot to tell you that you should also call `rdd.count()` >> for "reduceByKey" as well. Could you try it and see if it works? >> >> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com> >> wrote: >> >>> Hi Ryan, >>> >>> I am using mapWithState after doing reduceByKey. >>> >>> I am right now using mapWithState as you suggested and triggering the >>> count manually. >>> >>> But, still unable to see any checkpointing taking place. In the DAG I >>> can see that the reduceByKey operation for the previous batches are also >>> being computed. >>> >>> >>> Thanks >>> Abhi >>> >>> >>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Hey Abhi, >>>> >>>> Using reducebykeyandwindow and mapWithState will trigger the bug >>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually: >>>> >>>> JavaMapWithStateDStream<...> stateDStream = >>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)); >>>> stateDStream.foreachRDD(new Function1<...>() { >>>> @Override >>>> public Void call(JavaRDD<...> rdd) throws Exception { >>>> rdd.count(); >>>> } >>>> }); >>>> return stateDStream.stateSnapshots(); >>>> >>>> >>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand < >>>> abhis.anan...@gmail.com> wrote: >>>> >>>>> Hi Ryan, >>>>> >>>>> Reposting the code. >>>>> >>>>> Basically my use case is something like - I am receiving the web >>>>> impression logs and may get the notify (listening from kafka) for those >>>>> impressions in the same interval (for me its 1 min) or any next interval >>>>> (upto 2 hours). Now, when I receive notify for a particular impression I >>>>> need to swap the date field in impression with the date field in notify >>>>> logs. The notify for an impression has the same key as impression. >>>>> >>>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>>> Tuple2<String, MyClass>> mappingFunc = >>>>> new Function3<String, Optional<MyClass>, State<MyClass>, >>>>> Tuple2<String, MyClass>>() { >>>>> @Override >>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>>>> State<MyClass> state) { >>>>> MyClass nullObj = new MyClass(); >>>>> nullObj.setImprLog(null); >>>>> nullObj.setNotifyLog(null); >>>>> MyClass current = one.or(nullObj); >>>>> >>>>> if(current!= null && current.getImprLog() != null && >>>>> current.getMyClassType() == 1 /*this is impression*/){ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> else if (current.getNotifyLog() != null && current.getMyClassType() >>>>> == 3 /*notify for the impression received*/){ >>>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>>> //swappping the dates >>>>> return new Tuple2<>(key, oldState); >>>>> } >>>>> else{ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> } >>>>> else{ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> >>>>> } >>>>> }; >>>>> >>>>> >>>>> return >>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots(); >>>>> >>>>> >>>>> Currently I am using reducebykeyandwindow without the inverse function >>>>> and I am able to get the correct data. But, issue the might arise is when >>>>> I >>>>> have to restart my application from checkpoint and it repartitions and >>>>> computes the previous 120 partitions, which delays the incoming batches. >>>>> >>>>> >>>>> Thanks !! >>>>> Abhi >>>>> >>>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu < >>>>> shixi...@databricks.com> wrote: >>>>> >>>>>> Hey Abhi, >>>>>> >>>>>> Could you post how you use mapWithState? By default, it should do >>>>>> checkpointing every 10 batches. >>>>>> However, there is a known issue that prevents mapWithState from >>>>>> checkpointing in some special cases: >>>>>> https://issues.apache.org/jira/browse/SPARK-6847 >>>>>> >>>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand < >>>>>> abhis.anan...@gmail.com> wrote: >>>>>> >>>>>>> Any Insights on this one ? >>>>>>> >>>>>>> >>>>>>> Thanks !!! >>>>>>> Abhi >>>>>>> >>>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand < >>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>> >>>>>>>> I am now trying to use mapWithState in the following way using some >>>>>>>> example codes. But, by looking at the DAG it does not seem to >>>>>>>> checkpoint >>>>>>>> the state and when restarting the application from checkpoint, it >>>>>>>> re-partitions all the previous batches data from kafka. >>>>>>>> >>>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>>>>>> Tuple2<String, MyClass>> mappingFunc = >>>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>, >>>>>>>> Tuple2<String, MyClass>>() { >>>>>>>> @Override >>>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> >>>>>>>> one, State<MyClass> state) { >>>>>>>> MyClass nullObj = new MyClass(); >>>>>>>> nullObj.setImprLog(null); >>>>>>>> nullObj.setNotifyLog(null); >>>>>>>> MyClass current = one.or(nullObj); >>>>>>>> >>>>>>>> if(current!= null && current.getImprLog() != null && >>>>>>>> current.getMyClassType() == 1){ >>>>>>>> return new Tuple2<>(key, null); >>>>>>>> } >>>>>>>> else if (current.getNotifyLog() != null && >>>>>>>> current.getMyClassType() == 3){ >>>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>>>>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>>>>>> return new Tuple2<>(key, oldState); >>>>>>>> } >>>>>>>> else{ >>>>>>>> return new Tuple2<>(key, null); >>>>>>>> } >>>>>>>> } >>>>>>>> else{ >>>>>>>> return new Tuple2<>(key, null); >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> }; >>>>>>>> >>>>>>>> >>>>>>>> Please suggest if this is the proper way or am I doing something >>>>>>>> wrong. >>>>>>>> >>>>>>>> >>>>>>>> Thanks !! >>>>>>>> Abhi >>>>>>>> >>>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu < >>>>>>>> sebastian....@gmail.com> wrote: >>>>>>>> >>>>>>>>> If you don't want to update your only option will be >>>>>>>>> updateStateByKey then >>>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> mapWithState supports checkpoint. >>>>>>>>>> >>>>>>>>>> There has been some bug fix since release of 1.6.0 >>>>>>>>>> e.g. >>>>>>>>>> SPARK-12591 NullPointerException using checkpointed >>>>>>>>>> mapWithState with KryoSerializer >>>>>>>>>> >>>>>>>>>> which is in the upcoming 1.6.1 >>>>>>>>>> >>>>>>>>>> Cheers >>>>>>>>>> >>>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Does mapWithState checkpoints the data ? >>>>>>>>>>> >>>>>>>>>>> When my application goes down and is restarted from checkpoint, >>>>>>>>>>> will mapWithState need to recompute the previous batches data ? >>>>>>>>>>> >>>>>>>>>>> Also, to use mapWithState I will need to upgrade my application >>>>>>>>>>> as I am using version 1.4.0 and mapWithState isnt supported there. >>>>>>>>>>> Is there >>>>>>>>>>> any other work around ? >>>>>>>>>>> >>>>>>>>>>> Cheers!! >>>>>>>>>>> Abhi >>>>>>>>>>> >>>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>>>>>>> sebastian....@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Looks like mapWithState could help you? >>>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" < >>>>>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi All, >>>>>>>>>>>>> >>>>>>>>>>>>> I have an use case like follows in my production environment >>>>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and >>>>>>>>>>>>> windowLength of 2 hours. >>>>>>>>>>>>> >>>>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the >>>>>>>>>>>>> same key but with different value,which might appear in the same >>>>>>>>>>>>> batch or >>>>>>>>>>>>> some next batch. >>>>>>>>>>>>> >>>>>>>>>>>>> When the key appears second time I need to update a field in >>>>>>>>>>>>> value of previous key with a field in the later key. The keys for >>>>>>>>>>>>> which the >>>>>>>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>>>>>>> >>>>>>>>>>>>> At the end of each second I need to output the result to >>>>>>>>>>>>> external database. >>>>>>>>>>>>> >>>>>>>>>>>>> For example : >>>>>>>>>>>>> >>>>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>>>>>>> At t=1sec I am getting >>>>>>>>>>>>> key0,value0(0,"prev0") >>>>>>>>>>>>> key1,value1 (1, "prev1") >>>>>>>>>>>>> key2,value2 (2,"prev2") >>>>>>>>>>>>> key2,value3 (3, "next2") >>>>>>>>>>>>> >>>>>>>>>>>>> Output to database after 1 sec >>>>>>>>>>>>> key2, newValue (2,"next2") >>>>>>>>>>>>> >>>>>>>>>>>>> At t=2 sec getting >>>>>>>>>>>>> key3,value4(4,"prev3") >>>>>>>>>>>>> key1,value5(5,"next1") >>>>>>>>>>>>> >>>>>>>>>>>>> Output to database after 2 sec >>>>>>>>>>>>> key1,newValue(1,"next1") >>>>>>>>>>>>> >>>>>>>>>>>>> At t=3 sec >>>>>>>>>>>>> key4,value6(6,"prev4") >>>>>>>>>>>>> key3,value7(7,"next3") >>>>>>>>>>>>> key5,value5(8,"prev5") >>>>>>>>>>>>> key5,value5(9,"next5") >>>>>>>>>>>>> key0,value0(10,"next0") >>>>>>>>>>>>> >>>>>>>>>>>>> Output to database after 3 sec >>>>>>>>>>>>> key0,newValue(0,"next0") >>>>>>>>>>>>> key3,newValue(4,"next3") >>>>>>>>>>>>> key5,newValue(8,"next5") >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Please suggest how this can be achieved. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks a lot !!!! >>>>>>>>>>>>> Abhi >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >