I am not familiar with your code. bq. and then create the rdd
I assume you call ObjectOutputStream.close() prior to the above step. Cheers On Mon, Aug 31, 2015 at 9:42 AM, Ashish Shrowty <ashish.shro...@gmail.com> wrote: > Sure .. here it is (scroll below to see the NotSerializableException). > Note that upstream, I do load up the (user,item,ratings) data from a file > using ObjectInputStream, do some calculations that I put in a map and then > create the rdd used in the code above from that map. I even tried > checkpointing the rdd and persisting it to break any lineage to the > original ObjectInputStream (if that was what was happening) - > > org.apache.spark.SparkException: Task not serializable > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) > > at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38) > > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) > > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48) > > at $iwC$$iwC$$iwC.<init>(<console>:50) > > at $iwC$$iwC.<init>(<console>:52) > > at $iwC.<init>(<console>:54) > > at <init>(<console>:56) > > at .<init>(<console>:60) > > at .<clinit>(<console>) > > at .<init>(<console>:7) > > at .<clinit>(<console>) > > at $print(<console>) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) > > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) > > at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) > > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) > > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) > > at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796) > > at > org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321) > > at > org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321) > > at > scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) > > at > scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) > > at > scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) > > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) > > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) > > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) > > at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) > > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) > > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) > > at org.apache.spark.repl.Main$.main(Main.scala:31) > > at org.apache.spark.repl.Main.main(Main.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) > > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > *Caused by: java.io.NotSerializableException: java.io.ObjectInputStream* > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > ... > > ... > > On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yuzhih...@gmail.com> wrote: > >> Ashish: >> Can you post the complete stack trace for NotSerializableException ? >> >> Cheers >> >> On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <ashish.shro...@gmail.com >> > wrote: >> >>> bcItemsIdx is just a broadcast variable constructed out of >>> Array[(String)] .. it holds the item ids and I use it for indexing the >>> MatrixEntry objects >>> >>> >>> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote: >>> >>>> It's not clear; that error is different still and somehow suggests >>>> you're serializing a stream somewhere. I'd look at what's inside >>>> bcItemsIdx as that is not shown here. >>>> >>>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty >>>> >>>> <ashish.shro...@gmail.com> wrote: >>>> > Sean, >>>> > >>>> > Thanks for your comments. What I was really trying to do was to >>>> transform a >>>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some >>>> column >>>> > similarity calculations while exploring the data before building some >>>> > models. But to do that I need to first convert the user and item ids >>>> into >>>> > respective indexes where I intended on passing in an array into the >>>> closure, >>>> > which is where I got stuck with this overflowerror trying to figure >>>> out >>>> > where it is happening. The actual error I got was slightly different >>>> (Caused >>>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I >>>> started >>>> > investigating this issue which led me to the earlier code snippet >>>> that I had >>>> > posted. This is again because of the bcItemsIdx variable being passed >>>> into >>>> > the closure. Below code works if I don't pass in the variable and use >>>> simply >>>> > a constant like 10 in its place .. The code thus far - >>>> > >>>> > // rdd below is RDD[(String,String,Double)] >>>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of >>>> item >>>> > ids >>>> > val gRdd = rdd.map{case(user,item,rating) => >>>> > ((user),(item,rating))}.groupByKey >>>> > val idxRdd = gRdd.zipWithIndex >>>> > val cm = new CoordinateMatrix( >>>> > idxRdd.flatMap[MatrixEntry](e => { >>>> > e._1._2.map(item=> { >>>> > MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1), >>>> > item._2) // <- This is where I get the Serialization error passing in >>>> the >>>> > index >>>> > // MatrixEntry(e._2, 10, item._2) // <- This works >>>> > }) >>>> > }) >>>> > ) >>>> > val rm = cm.toRowMatrix >>>> > val simMatrix = rm.columnSimilarities() >>>> > >>>> > I would like to make this work in the Spark shell as I am still >>>> exploring >>>> > the data. Let me know if there is an alternate way of constructing the >>>> > RowMatrix. >>>> > >>>> > Thanks and appreciate all the help! >>>> > >>>> > Ashish >>>> > >>>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote: >>>> >> >>>> >> Yeah I see that now. I think it fails immediately because the map >>>> >> operation does try to clean and/or verify the serialization of the >>>> >> closure upfront. >>>> >> >>>> >> I'm not quite sure what is going on, but I think it's some strange >>>> >> interaction between how you're building up the list and what the >>>> >> resulting representation happens to be like, and how the closure >>>> >> cleaner works, which can't be perfect. The shell also introduces an >>>> >> extra layer of issues. >>>> >> >>>> >> For example, the slightly more canonical approaches work fine: >>>> >> >>>> >> import scala.collection.mutable.MutableList >>>> >> val lst = MutableList[(String,String,Double)]() >>>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble)) >>>> >> >>>> >> or just >>>> >> >>>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble)) >>>> >> >>>> >> If you just need this to work, maybe those are better alternatives >>>> anyway. >>>> >> You can also check whether it works without the shell, as I suspect >>>> >> that's a factor. >>>> >> >>>> >> It's not an error in Spark per se but saying that something's default >>>> >> Java serialization graph is very deep, so it's like the code you >>>> wrote >>>> >> plus the closure cleaner ends up pulling in some huge linked list and >>>> >> serializing it the direct and unuseful way. >>>> >> >>>> >> If you have an idea about exactly why it's happening you can open a >>>> >> JIRA, but arguably it's something that's nice to just work but isn't >>>> >> to do with Spark per se. Or, have a look at others related to the >>>> >> closure and shell and you may find this is related to other known >>>> >> behavior. >>>> >> >>>> >> >>>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty >>>> >> <ashish.shro...@gmail.com> wrote: >>>> >> > Sean .. does the code below work for you in the Spark shell? Ted >>>> got the >>>> >> > same error - >>>> >> > >>>> >> > val a=10 >>>> >> > val lst = MutableList[(String,String,Double)]() >>>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>>> >> > >>>> >> > -Ashish >>>> >> > >>>> >> > >>>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> >>>> wrote: >>>> >> >> >>>> >> >> I'm not sure how to reproduce it? this code does not produce an >>>> error >>>> >> >> in >>>> >> >> master. >>>> >> >> >>>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty >>>> >> >> <ashish.shro...@gmail.com> wrote: >>>> >> >> > Do you think I should create a JIRA? >>>> >> >> > >>>> >> >> > >>>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhih...@gmail.com> >>>> wrote: >>>> >> >> >> >>>> >> >> >> I got StackOverFlowError as well :-( >>>> >> >> >> >>>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty >>>> >> >> >> <ashish.shro...@gmail.com> >>>> >> >> >> wrote: >>>> >> >> >>> >>>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. >>>> Are you >>>> >> >> >>> able >>>> >> >> >>> to replicate on your side? >>>> >> >> >>> >>>> >> >> >>> >>>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhih...@gmail.com> >>>> >> >> >>> wrote: >>>> >> >> >>>> >>>> >> >> >>>> I see. >>>> >> >> >>>> >>>> >> >> >>>> What about using the following in place of variable a ? >>>> >> >> >>>> >>>> >> >> >>>> >>>> >> >> >>>> >>>> >> >> >>>> >>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables >>>> >> >> >>>> >>>> >> >> >>>> Cheers >>>> >> >> >>>> >>>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty >>>> >> >> >>>> <ashish.shro...@gmail.com> wrote: >>>> >> >> >>>>> >>>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the >>>> >> >> >>>>> stackoverflowerror, its very weird >>>> >> >> >>>>> >>>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error >>>> >> >> >>>>> happens >>>> >> >> >>>>> when I try to pass a variable into the closure. The example >>>> you >>>> >> >> >>>>> have >>>> >> >> >>>>> above >>>> >> >> >>>>> works fine since there is no variable being passed into the >>>> >> >> >>>>> closure >>>> >> >> >>>>> from the >>>> >> >> >>>>> shell. >>>> >> >> >>>>> >>>> >> >> >>>>> -Ashish >>>> >> >> >>>>> >>>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhih...@gmail.com >>>> > >>>> >> >> >>>>> wrote: >>>> >> >> >>>>>> >>>> >> >> >>>>>> Using Spark shell : >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList >>>> >> >> >>>>>> import scala.collection.mutable.MutableList >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]() >>>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, >>>> >> >> >>>>>> Double)] >>>> >> >> >>>>>> = >>>> >> >> >>>>>> MutableList() >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> >>>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>>> >> >> >>>>>> <console>:27: error: not found: value a >>>> >> >> >>>>>> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>>> >> >> >>>>>> ^ >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else >>>> 0) >>>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] >>>> at map >>>> >> >> >>>>>> at >>>> >> >> >>>>>> <console>:27 >>>> >> >> >>>>>> >>>> >> >> >>>>>> scala> rdd.count() >>>> >> >> >>>>>> ... >>>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count >>>> at >>>> >> >> >>>>>> <console>:30, took 0.478350 s >>>> >> >> >>>>>> res1: Long = 10000 >>>> >> >> >>>>>> >>>> >> >> >>>>>> Ashish: >>>> >> >> >>>>>> Please refine your example to mimic more closely what your >>>> code >>>> >> >> >>>>>> actually did. >>>> >> >> >>>>>> >>>> >> >> >>>>>> Thanks >>>> >> >> >>>>>> >>>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen < >>>> so...@cloudera.com> >>>> >> >> >>>>>> wrote: >>>> >> >> >>>>>>> >>>> >> >> >>>>>>> That can't cause any error, since there is no action in >>>> your >>>> >> >> >>>>>>> first >>>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an >>>> >> >> >>>>>>> error. >>>> >> >> >>>>>>> You >>>> >> >> >>>>>>> must be executing something different. >>>> >> >> >>>>>>> >>>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty >>>> >> >> >>>>>>> <ashish.shro...@gmail.com> >>>> >> >> >>>>>>> wrote: >>>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and >>>> I have >>>> >> >> >>>>>>> > a >>>> >> >> >>>>>>> > simple >>>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects >>>> in it. >>>> >> >> >>>>>>> > I >>>> >> >> >>>>>>> > get >>>> >> >> >>>>>>> > a >>>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following >>>> code >>>> >> >> >>>>>>> > (the >>>> >> >> >>>>>>> > code >>>> >> >> >>>>>>> > itself is just representative of other logic where I >>>> need to >>>> >> >> >>>>>>> > pass >>>> >> >> >>>>>>> > in a >>>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but >>>> no luck >>>> >> >> >>>>>>> > .. >>>> >> >> >>>>>>> > missing >>>> >> >> >>>>>>> > something basic here - >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>) >>>> >> >> >>>>>>> > val a=10 >>>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0) >>>> >> >> >>>>>>> > This throws - >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > java.lang.StackOverflowError >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>> >> >> >>>>>>> > at >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>> >> >> >>>>>>> > ... >>>> >> >> >>>>>>> > ... >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > More experiments .. this works - >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > val lst = >>>> Range(0,10000).map(i=>("10","10",i:Double)).toList >>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError - >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]() >>>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > Any help appreciated! >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > Thanks, >>>> >> >> >>>>>>> > Ashish >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > -- >>>> >> >> >>>>>>> > View this message in context: >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html >>>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list >>>> archive at >>>> >> >> >>>>>>> > Nabble.com. >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> > >>>> --------------------------------------------------------------------- >>>> >> >> >>>>>>> > To unsubscribe, e-mail: >>>> user-unsubscr...@spark.apache.org >>>> >> >> >>>>>>> > For additional commands, e-mail: >>>> user-h...@spark.apache.org >>>> >> >> >>>>>>> > >>>> >> >> >>>>>>> >>>> >> >> >>>>>>> >>>> >> >> >>>>>>> >>>> >> >> >>>>>>> >>>> --------------------------------------------------------------------- >>>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> >> >> >>>>>>> For additional commands, e-mail: >>>> user-h...@spark.apache.org >>>> >> >> >>>>>>> >>>> >> >> >>>>>> >>>> >> >> >>>> >>>> >> >> >> >>>> >> >> > >>>> >>> >>