Another approach I found:
First, I make a PartitionsRDD class which only takes a certain range of
partitions
-----------------------------------------------------
case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
extends Partition {}
class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] =
prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
idx)=>{new PartitionsRDDPartition(idx,
split)}}.asInstanceOf[Array[Partition]]
override def compute(split: Partition, context: TaskContext): Iterator[U]
=
prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
context)
}
-----------------------------------------------------
And then I can create my two RDD's using the following:
-----------------------------------------------------
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
val left = new PartitionsRDD[T](rdd, 0, hotPartitions);
val right = new PartitionsRDD[T](rdd, hotPartitions,
rdd.numPartitions-hotPartitions);
(left, right)
}
-----------------------------------------------------
This approach saves a few minutes when compared to the one in the previous
post (at least on a local test.. I still need to test this on a real
cluster).
Any thought about this? Is this the right thing to do or am I missing
something important?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]