Does something like the code below make any sense or would there be a more
efficient way to do it ?

    val wordsOnOnePartition = input
>       .map { word => Math.abs(word.id.hashCode) % numPartitions -> word }
>       .partitionBy(new PartitionIdPassthrough(numPartitions))
>     val indices = wordsOnOnePartition
>         .mapPartitions(it => new IndexIterator(it, m))
>         .cache()
>     val wordsOnEachPartition = input
>       .flatMap(word => 0 until numPartitions map { partition => partition
> -> word } )
>       .partitionBy(new PartitionIdPassthrough(numPartitions))
>     val nearest = indices.join(wordsOnEachPartition)
>       .flatMap { case (_, (index, Word(word, vector))) =>
>         index.findNearest(vector, k + 1).collect {
>           case SearchResult(Word(relatedWord, _), score) if relatedWord !=
> word =>
>             RelatedItem(word, relatedWord, score)
>         }
>         .take(k)
>       }
>     val result = nearest.groupBy(_.word).map { case (word, relatedItems) =>
>         word +: relatedItems.toSeq
>             .sortBy(_.similarity)(Ordering[Double].reverse)
>             .map(_.relatedWord)
>             .take(k)
>             .mkString("\t")
>     }
>

I manually assign a partition to each word of a list of words, and
repartition the rdd by this partition key

There i use mapPartitions to construct a partial index so i end up with one
index in each partition.

Then i read the words again but this time assign every partition to each
word and join it on the indices rdd by partition key. So effectively every
index will be queries

Finally i merge the results from each index into a single  list keeping
only the most relevant items by doing a groupBy



On Sun, 30 Jun 2019 at 01:45, Chris Teoh <chris.t...@gmail.com> wrote:

> The closest thing I can think of here is if you have both dataframes
> written out using buckets. Hive uses this technique for join optimisation
> such that both datasets of the same bucket are read by the same mapper to
> achieve map side joins.
>
> On Sat., 29 Jun. 2019, 9:10 pm jelmer, <jkupe...@gmail.com> wrote:
>
>> I have 2 dataframes,
>>
>> Dataframe A which contains 1 element per partition that is gigabytes big
>> (an index)
>>
>> Dataframe B which is made up out of millions of small rows.
>>
>> I want to join B on A but i want all the work to be done on the executors
>> holding the partitions of dataframe A
>>
>> Is there a way to accomplish this without putting dataframe B in a
>> broadcast variable or doing a broadcast join ?
>>
>>

Reply via email to