Ashish: Can you post the complete stack trace for NotSerializableException ?
Cheers On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <[email protected]> wrote: > bcItemsIdx is just a broadcast variable constructed out of Array[(String)] > .. it holds the item ids and I use it for indexing the MatrixEntry objects > > > On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <[email protected]> wrote: > >> It's not clear; that error is different still and somehow suggests >> you're serializing a stream somewhere. I'd look at what's inside >> bcItemsIdx as that is not shown here. >> >> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty >> >> <[email protected]> wrote: >> > Sean, >> > >> > Thanks for your comments. What I was really trying to do was to >> transform a >> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some >> column >> > similarity calculations while exploring the data before building some >> > models. But to do that I need to first convert the user and item ids >> into >> > respective indexes where I intended on passing in an array into the >> closure, >> > which is where I got stuck with this overflowerror trying to figure out >> > where it is happening. The actual error I got was slightly different >> (Caused >> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I >> started >> > investigating this issue which led me to the earlier code snippet that >> I had >> > posted. This is again because of the bcItemsIdx variable being passed >> into >> > the closure. Below code works if I don't pass in the variable and use >> simply >> > a constant like 10 in its place .. The code thus far - >> > >> > // rdd below is RDD[(String,String,Double)] >> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of >> item >> > ids >> > val gRdd = rdd.map{case(user,item,rating) => >> > ((user),(item,rating))}.groupByKey >> > val idxRdd = gRdd.zipWithIndex >> > val cm = new CoordinateMatrix( >> > idxRdd.flatMap[MatrixEntry](e => { >> > e._1._2.map(item=> { >> > MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1), >> > item._2) // <- This is where I get the Serialization error passing in >> the >> > index >> > // MatrixEntry(e._2, 10, item._2) // <- This works >> > }) >> > }) >> > ) >> > val rm = cm.toRowMatrix >> > val simMatrix = rm.columnSimilarities() >> > >> > I would like to make this work in the Spark shell as I am still >> exploring >> > the data. Let me know if there is an alternate way of constructing the >> > RowMatrix. >> > >> > Thanks and appreciate all the help! >> > >> > Ashish >> > >> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <[email protected]> wrote: >> >> >> >> Yeah I see that now. I think it fails immediately because the map >> >> operation does try to clean and/or verify the serialization of the >> >> closure upfront. >> >> >> >> I'm not quite sure what is going on, but I think it's some strange >> >> interaction between how you're building up the list and what the >> >> resulting representation happens to be like, and how the closure >> >> cleaner works, which can't be perfect. The shell also introduces an >> >> extra layer of issues. >> >> >> >> For example, the slightly more canonical approaches work fine: >> >> >> >> import scala.collection.mutable.MutableList >> >> val lst = MutableList[(String,String,Double)]() >> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble)) >> >> >> >> or just >> >> >> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble)) >> >> >> >> If you just need this to work, maybe those are better alternatives >> anyway. >> >> You can also check whether it works without the shell, as I suspect >> >> that's a factor. >> >> >> >> It's not an error in Spark per se but saying that something's default >> >> Java serialization graph is very deep, so it's like the code you wrote >> >> plus the closure cleaner ends up pulling in some huge linked list and >> >> serializing it the direct and unuseful way. >> >> >> >> If you have an idea about exactly why it's happening you can open a >> >> JIRA, but arguably it's something that's nice to just work but isn't >> >> to do with Spark per se. Or, have a look at others related to the >> >> closure and shell and you may find this is related to other known >> >> behavior. >> >> >> >> >> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty >> >> <[email protected]> wrote: >> >> > Sean .. does the code below work for you in the Spark shell? Ted got >> the >> >> > same error - >> >> > >> >> > val a=10 >> >> > val lst = MutableList[(String,String,Double)]() >> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >> >> > >> >> > -Ashish >> >> > >> >> > >> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <[email protected]> >> wrote: >> >> >> >> >> >> I'm not sure how to reproduce it? this code does not produce an >> error >> >> >> in >> >> >> master. >> >> >> >> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty >> >> >> <[email protected]> wrote: >> >> >> > Do you think I should create a JIRA? >> >> >> > >> >> >> > >> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <[email protected]> >> wrote: >> >> >> >> >> >> >> >> I got StackOverFlowError as well :-( >> >> >> >> >> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty >> >> >> >> <[email protected]> >> >> >> >> wrote: >> >> >> >>> >> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are >> you >> >> >> >>> able >> >> >> >>> to replicate on your side? >> >> >> >>> >> >> >> >>> >> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <[email protected]> >> >> >> >>> wrote: >> >> >> >>>> >> >> >> >>>> I see. >> >> >> >>>> >> >> >> >>>> What about using the following in place of variable a ? >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> >> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables >> >> >> >>>> >> >> >> >>>> Cheers >> >> >> >>>> >> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty >> >> >> >>>> <[email protected]> wrote: >> >> >> >>>>> >> >> >> >>>>> @Sean - Agree that there is no action, but I still get the >> >> >> >>>>> stackoverflowerror, its very weird >> >> >> >>>>> >> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error >> >> >> >>>>> happens >> >> >> >>>>> when I try to pass a variable into the closure. The example >> you >> >> >> >>>>> have >> >> >> >>>>> above >> >> >> >>>>> works fine since there is no variable being passed into the >> >> >> >>>>> closure >> >> >> >>>>> from the >> >> >> >>>>> shell. >> >> >> >>>>> >> >> >> >>>>> -Ashish >> >> >> >>>>> >> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <[email protected]> >> >> >> >>>>> wrote: >> >> >> >>>>>> >> >> >> >>>>>> Using Spark shell : >> >> >> >>>>>> >> >> >> >>>>>> scala> import scala.collection.mutable.MutableList >> >> >> >>>>>> import scala.collection.mutable.MutableList >> >> >> >>>>>> >> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]() >> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, >> >> >> >>>>>> Double)] >> >> >> >>>>>> = >> >> >> >>>>>> MutableList() >> >> >> >>>>>> >> >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >> >> >> >>>>>> >> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >> >> >> >>>>>> <console>:27: error: not found: value a >> >> >> >>>>>> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >> >> >> >>>>>> ^ >> >> >> >>>>>> >> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0) >> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at >> map >> >> >> >>>>>> at >> >> >> >>>>>> <console>:27 >> >> >> >>>>>> >> >> >> >>>>>> scala> rdd.count() >> >> >> >>>>>> ... >> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at >> >> >> >>>>>> <console>:30, took 0.478350 s >> >> >> >>>>>> res1: Long = 10000 >> >> >> >>>>>> >> >> >> >>>>>> Ashish: >> >> >> >>>>>> Please refine your example to mimic more closely what your >> code >> >> >> >>>>>> actually did. >> >> >> >>>>>> >> >> >> >>>>>> Thanks >> >> >> >>>>>> >> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen < >> [email protected]> >> >> >> >>>>>> wrote: >> >> >> >>>>>>> >> >> >> >>>>>>> That can't cause any error, since there is no action in your >> >> >> >>>>>>> first >> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an >> >> >> >>>>>>> error. >> >> >> >>>>>>> You >> >> >> >>>>>>> must be executing something different. >> >> >> >>>>>>> >> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty >> >> >> >>>>>>> <[email protected]> >> >> >> >>>>>>> wrote: >> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I >> have >> >> >> >>>>>>> > a >> >> >> >>>>>>> > simple >> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in >> it. >> >> >> >>>>>>> > I >> >> >> >>>>>>> > get >> >> >> >>>>>>> > a >> >> >> >>>>>>> > StackOverFlowError each time I try to run the following >> code >> >> >> >>>>>>> > (the >> >> >> >>>>>>> > code >> >> >> >>>>>>> > itself is just representative of other logic where I need >> to >> >> >> >>>>>>> > pass >> >> >> >>>>>>> > in a >> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no >> luck >> >> >> >>>>>>> > .. >> >> >> >>>>>>> > missing >> >> >> >>>>>>> > something basic here - >> >> >> >>>>>>> > >> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>) >> >> >> >>>>>>> > val a=10 >> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0) >> >> >> >>>>>>> > This throws - >> >> >> >>>>>>> > >> >> >> >>>>>>> > java.lang.StackOverflowError >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> >> >> >>>>>>> > at >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> >> >> >>>>>>> > ... >> >> >> >>>>>>> > ... >> >> >> >>>>>>> > >> >> >> >>>>>>> > More experiments .. this works - >> >> >> >>>>>>> > >> >> >> >>>>>>> > val lst = >> Range(0,10000).map(i=>("10","10",i:Double)).toList >> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >> >> >> >>>>>>> > >> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError - >> >> >> >>>>>>> > >> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]() >> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >> >> >> >>>>>>> > >> >> >> >>>>>>> > Any help appreciated! >> >> >> >>>>>>> > >> >> >> >>>>>>> > Thanks, >> >> >> >>>>>>> > Ashish >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > -- >> >> >> >>>>>>> > View this message in context: >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html >> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive >> at >> >> >> >>>>>>> > Nabble.com. >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> --------------------------------------------------------------------- >> >> >> >>>>>>> > To unsubscribe, e-mail: [email protected] >> >> >> >>>>>>> > For additional commands, e-mail: >> [email protected] >> >> >> >>>>>>> > >> >> >> >>>>>>> >> >> >> >>>>>>> >> >> >> >>>>>>> >> >> >> >>>>>>> >> --------------------------------------------------------------------- >> >> >> >>>>>>> To unsubscribe, e-mail: [email protected] >> >> >> >>>>>>> For additional commands, e-mail: [email protected] >> >> >> >>>>>>> >> >> >> >>>>>> >> >> >> >>>> >> >> >> >> >> >> >> > >> >
