Hey there, I think it's overcomplicating the partitioning by explicitly specifying the partitioning when using the hash is the default behaviour of the partitioner in Spark. You could simply do a partitionBy and it would implement the hash partitioner by default.
Let me know if I've misinterpreted the code. I think also using map after partitioning will also cause Spark to lose the partitioner. On Sun, 30 Jun 2019 at 20:56, jelmer <jkupe...@gmail.com> wrote: > Does something like the code below make any sense or would there be a more > efficient way to do it ? > > val wordsOnOnePartition = input >> .map { word => Math.abs(word.id.hashCode) % numPartitions -> word } >> .partitionBy(new PartitionIdPassthrough(numPartitions)) >> val indices = wordsOnOnePartition >> .mapPartitions(it => new IndexIterator(it, m)) >> .cache() >> val wordsOnEachPartition = input >> .flatMap(word => 0 until numPartitions map { partition => partition >> -> word } ) >> .partitionBy(new PartitionIdPassthrough(numPartitions)) >> val nearest = indices.join(wordsOnEachPartition) >> .flatMap { case (_, (index, Word(word, vector))) => >> index.findNearest(vector, k + 1).collect { >> case SearchResult(Word(relatedWord, _), score) if relatedWord >> != word => >> RelatedItem(word, relatedWord, score) >> } >> .take(k) >> } >> val result = nearest.groupBy(_.word).map { case (word, relatedItems) >> => >> word +: relatedItems.toSeq >> .sortBy(_.similarity)(Ordering[Double].reverse) >> .map(_.relatedWord) >> .take(k) >> .mkString("\t") >> } >> > > I manually assign a partition to each word of a list of words, and > repartition the rdd by this partition key > > There i use mapPartitions to construct a partial index so i end up with > one index in each partition. > > Then i read the words again but this time assign every partition to each > word and join it on the indices rdd by partition key. So effectively every > index will be queries > > Finally i merge the results from each index into a single list keeping > only the most relevant items by doing a groupBy > > > > On Sun, 30 Jun 2019 at 01:45, Chris Teoh <chris.t...@gmail.com> wrote: > >> The closest thing I can think of here is if you have both dataframes >> written out using buckets. Hive uses this technique for join optimisation >> such that both datasets of the same bucket are read by the same mapper to >> achieve map side joins. >> >> On Sat., 29 Jun. 2019, 9:10 pm jelmer, <jkupe...@gmail.com> wrote: >> >>> I have 2 dataframes, >>> >>> Dataframe A which contains 1 element per partition that is gigabytes big >>> (an index) >>> >>> Dataframe B which is made up out of millions of small rows. >>> >>> I want to join B on A but i want all the work to be done on the >>> executors holding the partitions of dataframe A >>> >>> Is there a way to accomplish this without putting dataframe B in a >>> broadcast variable or doing a broadcast join ? >>> >>> -- Chris