Methods like first() and take(n) can't guarantee to return the same result
in a distributed context, because Spark uses an algorithm to grab data from
one or more partitions that involves running a distributed job over the
cluster, with tasks on the nodes where the chosen partitions are located.
You can look at the logic in the Spark code base, RDD.scala (first method
calls the take method) and SparkContext.scala (runJob method, which take
calls).

However, the exceptions definitely look like bugs to me. There must be some
empty partitions.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <walrusthe...@gmail.com>
wrote:

> Hi,
>
> I'm launching a Spark cluster with the spark-ec2 script and playing around
> in spark-shell. I'm running the same line of code over and over again, and
> getting different results, and sometimes exceptions.  Towards the end,
> after I cache the first RDD, it gives me the correct result multiple times
> in a row before throwing an exception.  How can I get correct behavior out
> of these operations on these RDDs?
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116]
> at sortBy at <console>:36
>
> scala> targets.first
> res26: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets = data map {_.REGEX} groupBy{identity} map {
> Function.tupled(_->_.size)} sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125]
> at sortBy at <console>:36
>
> scala> targets.first
> res27: (String, Int) = (nika,7)
>
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134]
> at sortBy at <console>:36
>
> scala> targets.first
> res28: (String, Int) = (\bcalientes?\b,6)
>
> scala> targets.sortBy(_._2,false).first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283]
> at sortBy at <console>:36
>
> scala> targets.first
> res46: (String, Int) = (\bhurting\ yous?\b,8)
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292]
> at sortBy at <console>:36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301]
> at sortBy at <console>:36
>
> scala> targets.first
> res48: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310]
> at sortBy at <console>:36
>
> scala> targets.first
> res49: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319]
> at sortBy at <console>:36
>
> scala> targets.first
> res50: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328]
> at sortBy at <console>:36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
>
>
>
>

Reply via email to