Hmm… my bad. The reason of the first exception is that the Iterator class
is not serializable since my snippet tries to return something like
RDD[(String,
Iterator[(Double, Double)]]. As for the second one, the for expression
returns an iterator rather than a collection, you need to traverse the
iterator to get the result. You may materialize the iterator to an location
array to get the final filtered positions without wasting memory for the
whole cartesian product:

def distance(a: (Double, Double), b: (Double, Double)): Double = ???
// Defines some total ordering among locations.def lessThan(a:
(Double, Double), b: (Double, Double)): Boolean = ???

sc.textFile("input")
  .map { line =>
    val Array(_, latitude, longitude, ip, _, _) = line.split(",")
    ip -> (latitude.toDouble, longitude.toDouble)
  }
  .groupByKey()
  .mapValues { positions =>
    (for {
      a <- positions.iterator
      b <- positions.iterator
      if lessThan(a, b) && distance(a, b) < 100
    } yield (a, b)).toArray   // <<--- Materialize the iterator to an array
  }

This should fix both of those problems since the iterator is materialized
and Array is serializable.
​


On Thu, Jun 5, 2014 at 3:47 PM, lmk <lakshmi.muralikrish...@gmail.com>
wrote:

> Hi Cheng,
> Sorry Again.
>
> In this method, i see that the values for
>           a <- positions.iterator
>           b <- positions.iterator
>
> always remain the same. I tried to do a  b <- positions.iterator.next, it
> throws an  error: value filter is not a member of (Double, Double)
>
> Is there something I am missing out here?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7033.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to