I see this implementation before. The problem here is that: If after several hashes, if a pair of points appears K times in a bucket (with respect to K hashes), the distance needs to be computed K times, and total the data needs to shuffled will upto K times. So it deduce to my problem. I'm trying new approach and I think It will be better than my original approach: val part1 = rdd1.map(x => (x._1, x)).join(rdd2).map(_._2) val part2 = rdd2.map(x => (x._2, x)).join(rdd2).map(_._2) val distances = part1.join(part2).mapValues(v => measure.compute(v._1, v._2))
And I'm sorry for uggly title of email. I forgot to check it before send. 2016-04-28 10:10 GMT+07:00 Karl Higley <kmhig...@gmail.com>: > One idea is to avoid materializing the pairs of points before computing > the distances between them. You could do that using the LSH signatures by > building (Signature, (Int, Vector)) tuples, grouping by signature, and then > iterating pairwise over the resulting lists of points to compute the > distances between them. The points still have to be shuffled over the > network, but at least the shuffle doesn't create multiple copies of each > point (like a join by point ids would). > > Here's an implementation of that idea in the context of finding nearest > neighbors: > > https://github.com/karlhigley/spark-neighbors/blob/master/src/main/scala/com/github/karlhigley/spark/neighbors/ANNModel.scala#L33-L34 > > Best, > Karl > > > > On Wed, Apr 27, 2016 at 10:22 PM nguyen duc tuan <newvalu...@gmail.com> > wrote: > >> Hi all, >> Currently, I'm working on implementing LSH on spark. The problem leads to >> follow problem. I have an RDD[(Int, Int)] stores all pairs of ids of >> vectors need to compute distance and an other RDD[(Int, Vector)] stores all >> vectors with their ids. Can anyone suggest an efficiency way to compute >> distance? My simple version that I try first is as follows but it's >> inefficient because it require a lot of shuffling data over the network. >> >> rdd1: RDD[(Int, Int)] = .. >> rdd2: RDD[(Int, Vector)] = ... >> val distances = rdd2.cartesian(rdd2) >> .map(x => ((x._1._1, x._2._1), (x._1._2, x._2._2))) >> .join(rdd1.map(x => (x, 1)) >> .mapValues(x => { >> measure.compute(x._1._1, x._1._2) >> }) >> >> Thanks for any suggestion. >> >