Don’t do Join firstly.

broadcast your small RDD, 

val bc = sc.broadcast(small_rdd)


then large_dd.filter(x.key in  bc.value).map( x =>  {
  bc.value.other_fileds + x  
}).distinct.groupByKey






> On Dec 7, 2015, at 1:41 PM, Z Z <zonked.zo...@gmail.com> wrote:
> 
> I have two RDDs, one really large in size and other much smaller. I'd like 
> find all unique tuples in large RDD with keys from the small RDD. There are 
> duplicates tuples as well and I only care about the distinct tuples.
> 
> For example
> large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
> small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
> expected_rdd = [('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]), 
> ('b',   [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]
> 
> There are two expensive operations in my solution - join and distinct. Both I 
> assume cause a full shuffle and leave the child RDD hash partitioned. Given 
> that, is the following the best I can do ?
> 
> keys = small_rdd.keys().collect()
> filtered_unique_large_rdd = large_rdd.filter(lambda (k,v):k in 
> keys).distinct().groupByKey()
> filtered_unique_large_rdd.join(small_rdd.groupByKey()).mapValues(lambda x: 
> sum([list(i) for i in x], [])).collect()
> 
> Basically, I filter the tuples explicitly, pick distincts and then join with 
> the smaller_rdd. I hope that that distinct operation will place the keys hash 
> partitioned and will not cause another shuffle during the subsequent join.
> 
> Thanks in advance for any suggestions/ideas.

Reply via email to