I have two RDDs, one really large in size and other much smaller. I'd like
find all unique tuples in large RDD with keys from the small RDD. There are
duplicates tuples as well and I only care about the distinct tuples.

For example
large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] *
5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
('b',   [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]

There are two expensive operations in my solution - join and distinct. Both
I assume cause a full shuffle and leave the child RDD hash partitioned.
Given that, is the following the best I can do ?

keys = small_rdd.keys().collect()
filtered_unique_large_rdd = large_rdd.filter(lambda (k,v):k in
keys).distinct().groupByKey()
filtered_unique_large_rdd.join(small_rdd.groupByKey()).mapValues(lambda x:
sum([list(i) for i in x], [])).collect()

Basically, I filter the tuples explicitly, pick distincts and then join
with the smaller_rdd. I hope that that distinct operation will place the
keys hash partitioned and will not cause another shuffle during the
subsequent join.

Thanks in advance for any suggestions/ideas.

Reply via email to