Hmm… my bad. The reason of the first exception is that the Iterator class is not serializable since my snippet tries to return something like RDD[(String, Iterator[(Double, Double)]]. As for the second one, the for expression returns an iterator rather than a collection, you need to traverse the iterator to get the result. You may materialize the iterator to an location array to get the final filtered positions without wasting memory for the whole cartesian product:
def distance(a: (Double, Double), b: (Double, Double)): Double = ??? // Defines some total ordering among locations.def lessThan(a: (Double, Double), b: (Double, Double)): Boolean = ??? sc.textFile("input") .map { line => val Array(_, latitude, longitude, ip, _, _) = line.split(",") ip -> (latitude.toDouble, longitude.toDouble) } .groupByKey() .mapValues { positions => (for { a <- positions.iterator b <- positions.iterator if lessThan(a, b) && distance(a, b) < 100 } yield (a, b)).toArray // <<--- Materialize the iterator to an array } This should fix both of those problems since the iterator is materialized and Array is serializable. On Thu, Jun 5, 2014 at 3:47 PM, lmk <lakshmi.muralikrish...@gmail.com> wrote: > Hi Cheng, > Sorry Again. > > In this method, i see that the values for > a <- positions.iterator > b <- positions.iterator > > always remain the same. I tried to do a b <- positions.iterator.next, it > throws an error: value filter is not a member of (Double, Double) > > Is there something I am missing out here? > > Thanks > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7033.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >