This is my first implementation. There are a few rough edges, but when I run
this I get the following exception. The class extends Partitioner which in
turn extends Serializable. Any idea what I am doing wrong?
scala> res156.partitionBy(new EqualWeightPartitioner(1000, res156,
weightFunction))
14/05/09 16:59:36 INFO SparkContext: Starting job: histogram at
<console>:197
14/05/09 16:59:36 INFO DAGScheduler: Got job 18 (histogram at <console>:197)
with 250 output partitions (allowLocal=false)
14/05/09 16:59:36 INFO DAGScheduler: Final stage: Stage 18 (histogram at
<console>:197)
14/05/09 16:59:36 INFO DAGScheduler: Parents of final stage: List()
14/05/09 16:59:36 INFO DAGScheduler: Missing parents: List()
14/05/09 16:59:36 INFO DAGScheduler: Submitting Stage 18
(MapPartitionsRDD[36] at histogram at <console>:197), which has no missing
parents
14/05/09 16:59:36 INFO DAGScheduler: Failed to run histogram at
<console>:197
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: scala.util.Random
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
==================================
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.Partitioner
import java.util
import scala.Array
import scala.reflect._
import scala.util.Random
/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by
range into roughly
* equal ranges. The ranges are determined by sampling the content of the
RDD passed in.
*/
class EqualWeightPartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
weightFunction: K => Double) extends Partitioner {
private val ordering = implicitly[Ordering[K]]
private val histogram = rdd.map(x=> (x._1, weightFunction(x._1))).map(x=>
x._2.toDouble).histogram(partitions)
private val bucketSize = (histogram._1(1) - histogram._1(0))
// need to refine this algorithm to use single partitions
// An array of upper bounds for the first (partitions - 1) partitions
private val bucketToPartitionMap: Array[(Int, Int)] = {
if (partitions == 1) {
Array()
} else {
val bucketWeights = histogram._2.zipWithIndex.map{case(y,idx)=>
(y*(histogram._1(idx) + histogram._1(idx+1))/2)}.map{var s :Double= 0.0; d
=> {s += d; s}}
val averageWeight = bucketWeights.last/partitions
val bucketPartition : Array[(Int, Int)] =
bucketWeights.map(x=>Math.max(0,Math.round(x/averageWeight)-1).toInt).zipWithIndex
bucketPartition.map(x=>x._2 match { case 0 => (0, x._1) case _ =>
(Math.min(partitions-1,bucketPartition(x._2-1)._1+1), x._1)})
}
}
def numPartitions = partitions
def getPartition(key: Any): Int = {
val rnd = new Random
val k = key.asInstanceOf[K]
val weight : Double = weightFunction(k)
val bucketIndex : Int = Math.min(1,(weight/bucketSize).toInt)
val partitionRange : (Int, Int) = bucketToPartitionMap(bucketIndex - 1)
partitionRange._1 + rnd.nextInt((partitionRange._2 - partitionRange._1 +
1 ))
}
override def equals(other: Any): Boolean = other match {
case r: EqualWeightPartitioner[_,_] =>
r.bucketToPartitionMap.sameElements(bucketToPartitionMap)
case _ =>
false
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.