I am trying to implement secondary sort in spark as we do in map-reduce.
Here is my data(tab separated, without c1, c2, c2).
c1 c2 c3
1 2 4
1 3 6
2 4 7
2 6 8
3 5 5
3 1 8
3 2 0
To do secondary sort, I create paried RDD as
/((c1 + ","+ c2), row)/
and then use a custom partitioner to partition only on c1. I have set
/spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
key "3" I am expecting to get
(3, 1)
(3, 2)
(3, 5)
but still getting the original order
3,5
3,1
3,2
Here is the custom partitioner code:
/class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = {
key.asInstanceOf[String].split(",")(0).toInt
}
}/
and driver code, please tell me what I am doing wrong
/val conf = new SparkConf().setAppName("MapInheritanceExample")
conf.set("spark.shuffle.manager", "SORT");
val sc = new SparkContext(conf)
val pF = sc.textFile(inputFile)
val log = LogFactory.getLog("MapFunctionTest")
val partitionedRDD = pF.map { x =>
var arr = x.split("\t");
(arr(0)+","+arr(1), null)
}.partitionBy(new StraightPartitioner(10))
var outputRDD = partitionedRDD.mapPartitions(p => {
p.map({ case(o, n) => {
o
}
})
})/
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]