I'm running into all kinds of problems with Spark 1.5.1 -- does anyone have a version that's working smoothly for them?
On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler <deanwamp...@gmail.com> wrote: > I didn't expect that to fail. I would call it a bug for sure, since it's > practically useless if this method doesn't work. > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat <walrusthe...@gmail.com> > wrote: > >> Dean, >> >> What's the point of Scala without magic? :-) >> >> Thanks for your help. It's still giving me unreliable results. There >> just has to be a way to do this in Spark. It's a pretty fundamental thing. >> >> scala> targets.takeOrdered(1) // imported as implicit here >> res23: Array[(String, Int)] = Array() >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res24: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res25: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res26: Array[(String, Int)] = Array((\bguns?\b,1253)) >> >> scala> targets.takeOrdered(1)(CountOrdering) >> res27: Array[(String, Int)] = Array((\bmurders?\b,717)) >> >> >> >> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler <deanwamp...@gmail.com> >> wrote: >> >>> You don't have to use sortBy (although that would be better...). You >>> have to define an Ordering object and pass it as the second argument list >>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala >>> than Spark should require here. Here's an example I've used: >>> >>> // schema with (String,Int). Order by the Int descending >>> object CountOrdering extends Ordering[(String,Int)] { >>> def compare(a:(String,Int), b:(String,Int)) = >>> -(a._2 compare b._2) // - so that it sorts descending >>> } >>> >>> myRDD.takeOrdered(100)(CountOrdering) >>> >>> >>> Or, if you add the keyword "implicit" before "object CountOrdering >>> {...}", then you can omit the second argument list. That's more magic than >>> is justified. ;) >>> >>> HTH, >>> >>> dean >>> >>> >>> Dean Wampler, Ph.D. >>> Author: Programming Scala, 2nd Edition >>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>> Typesafe <http://typesafe.com> >>> @deanwampler <http://twitter.com/deanwampler> >>> http://polyglotprogramming.com >>> >>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat <walrusthe...@gmail.com> >>> wrote: >>> >>>> Dean, >>>> >>>> Thanks a lot. Very helpful. How would I use takeOrdered to order by >>>> the second member of the tuple, as I am attempting to do with >>>> rdd.sortBy(_._2).first? >>>> >>>> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler <deanwamp...@gmail.com> >>>> wrote: >>>> >>>>> Someone please correct me if I'm wrong, but I think the answer is >>>>> actually "it's not implemented that way" in the sort methods, and it >>>>> should >>>>> either be documented more explicitly or fixed. >>>>> >>>>> Reading the Spark source code, it looks like each partition is sorted >>>>> internally, and each partition holds a contiguous range of keys in the >>>>> RDD. >>>>> So, if you know which order the partitions should be in, you can produce a >>>>> total order and hence allow take(n) to do what you expect. >>>>> >>>>> The take(n) appears to walk the list of partitions in order, but it's >>>>> that list that's not deterministic. I can't find any evidence that the RDD >>>>> output by sortBy has this list of partitions in the correct order. So, >>>>> each >>>>> time you ran your job, the "targets" RDD had sorted partitions, but the >>>>> list of partitions itself was not properly ordered globally. When you got >>>>> an exception, probably the first partition happened to be empty. >>>>> >>>>> Now, you could argue that take(n) is a "debug" method and the >>>>> performance implications of getting the RDD.partitions list in total order >>>>> is not justified. There is a takeOrdered(n) method that is both much more >>>>> efficient than sort().take(n), and it does the correct thing. Still, at >>>>> the >>>>> very least, the documentation for take(n) should tell you what to expect. >>>>> >>>>> Hope I'm right and this helps! >>>>> >>>>> dean >>>>> >>>>> Dean Wampler, Ph.D. >>>>> Author: Programming Scala, 2nd Edition >>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>> Typesafe <http://typesafe.com> >>>>> @deanwampler <http://twitter.com/deanwampler> >>>>> http://polyglotprogramming.com >>>>> >>>>> On Wed, Nov 18, 2015 at 5:53 PM, Walrus theCat <walrusthe...@gmail.com >>>>> > wrote: >>>>> >>>>>> Dean, >>>>>> >>>>>> Thanks for the insight. Shouldn't take(n) or first return the same >>>>>> result, provided that the RDD is sorted? If I specify that the RDD is >>>>>> ordered, I need to have guarantees as I reason about it that the first >>>>>> item >>>>>> is in fact the first, and the last is the last. >>>>>> >>>>>> On Wed, Nov 18, 2015 at 3:16 PM, Dean Wampler <deanwamp...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Methods like first() and take(n) can't guarantee to return the same >>>>>>> result in a distributed context, because Spark uses an algorithm to grab >>>>>>> data from one or more partitions that involves running a distributed job >>>>>>> over the cluster, with tasks on the nodes where the chosen partitions >>>>>>> are >>>>>>> located. You can look at the logic in the Spark code base, RDD.scala >>>>>>> (first method calls the take method) and SparkContext.scala (runJob >>>>>>> method, >>>>>>> which take calls). >>>>>>> >>>>>>> However, the exceptions definitely look like bugs to me. There must >>>>>>> be some empty partitions. >>>>>>> >>>>>>> dean >>>>>>> >>>>>>> Dean Wampler, Ph.D. >>>>>>> Author: Programming Scala, 2nd Edition >>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>> Typesafe <http://typesafe.com> >>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>> http://polyglotprogramming.com >>>>>>> >>>>>>> On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat < >>>>>>> walrusthe...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm launching a Spark cluster with the spark-ec2 script and playing >>>>>>>> around in spark-shell. I'm running the same line of code over and over >>>>>>>> again, and getting different results, and sometimes exceptions. >>>>>>>> Towards >>>>>>>> the end, after I cache the first RDD, it gives me the correct result >>>>>>>> multiple times in a row before throwing an exception. How can I get >>>>>>>> correct behavior out of these operations on these RDDs? >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[116] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res26: (String, Int) = (\bguns?\b,1253) >>>>>>>> >>>>>>>> scala> val targets = data map {_.REGEX} groupBy{identity} map { >>>>>>>> Function.tupled(_->_.size)} sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[125] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res27: (String, Int) = (nika,7) >>>>>>>> >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[134] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res28: (String, Int) = (\bcalientes?\b,6) >>>>>>>> >>>>>>>> scala> targets.sortBy(_._2,false).first >>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[283] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res46: (String, Int) = (\bhurting\ yous?\b,8) >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[292] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[301] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res48: (String, Int) = (\bguns?\b,1253) >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[310] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res49: (String, Int) = (\bguns?\b,1253) >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[319] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> res50: (String, Int) = (\bguns?\b,1253) >>>>>>>> >>>>>>>> scala> val targets = >>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>> MapPartitionsRDD[328] at sortBy at <console>:36 >>>>>>>> >>>>>>>> scala> targets.first >>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >