Methods like first() and take(n) can't guarantee to return the same result in a distributed context, because Spark uses an algorithm to grab data from one or more partitions that involves running a distributed job over the cluster, with tasks on the nodes where the chosen partitions are located. You can look at the logic in the Spark code base, RDD.scala (first method calls the take method) and SparkContext.scala (runJob method, which take calls).
However, the exceptions definitely look like bugs to me. There must be some empty partitions. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <walrusthe...@gmail.com> wrote: > Hi, > > I'm launching a Spark cluster with the spark-ec2 script and playing around > in spark-shell. I'm running the same line of code over and over again, and > getting different results, and sometimes exceptions. Towards the end, > after I cache the first RDD, it gives me the correct result multiple times > in a row before throwing an exception. How can I get correct behavior out > of these operations on these RDDs? > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116] > at sortBy at <console>:36 > > scala> targets.first > res26: (String, Int) = (\bguns?\b,1253) > > scala> val targets = data map {_.REGEX} groupBy{identity} map { > Function.tupled(_->_.size)} sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125] > at sortBy at <console>:36 > > scala> targets.first > res27: (String, Int) = (nika,7) > > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134] > at sortBy at <console>:36 > > scala> targets.first > res28: (String, Int) = (\bcalientes?\b,6) > > scala> targets.sortBy(_._2,false).first > java.lang.UnsupportedOperationException: empty collection > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283] > at sortBy at <console>:36 > > scala> targets.first > res46: (String, Int) = (\bhurting\ yous?\b,8) > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292] > at sortBy at <console>:36 > > scala> targets.first > java.lang.UnsupportedOperationException: empty collection > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301] > at sortBy at <console>:36 > > scala> targets.first > res48: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310] > at sortBy at <console>:36 > > scala> targets.first > res49: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319] > at sortBy at <console>:36 > > scala> targets.first > res50: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328] > at sortBy at <console>:36 > > scala> targets.first > java.lang.UnsupportedOperationException: empty collection > > > > >