Don’t do Join firstly. broadcast your small RDD,
val bc = sc.broadcast(small_rdd) then large_dd.filter(x.key in bc.value).map( x => { bc.value.other_fileds + x }).distinct.groupByKey > On Dec 7, 2015, at 1:41 PM, Z Z <zonked.zo...@gmail.com> wrote: > > I have two RDDs, one really large in size and other much smaller. I'd like > find all unique tuples in large RDD with keys from the small RDD. There are > duplicates tuples as well and I only care about the distinct tuples. > > For example > large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5) > small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)]) > expected_rdd = [('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]), > ('b', [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])] > > There are two expensive operations in my solution - join and distinct. Both I > assume cause a full shuffle and leave the child RDD hash partitioned. Given > that, is the following the best I can do ? > > keys = small_rdd.keys().collect() > filtered_unique_large_rdd = large_rdd.filter(lambda (k,v):k in > keys).distinct().groupByKey() > filtered_unique_large_rdd.join(small_rdd.groupByKey()).mapValues(lambda x: > sum([list(i) for i in x], [])).collect() > > Basically, I filter the tuples explicitly, pick distincts and then join with > the smaller_rdd. I hope that that distinct operation will place the keys hash > partitioned and will not cause another shuffle during the subsequent join. > > Thanks in advance for any suggestions/ideas.