Hi All,
When i am submitting a spark job on YARN with Custom Partitioner, it is
not picked by Executors. Executors still using the default HashPartitioner.
I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala)
and Custom Partitioner. The completed executor logs shows HashPartitioner.
Below is the Spark application code with Custom Partitioner and the log
line which is added into HashPartitioner class of Partition.scala
log.info("HashPartitioner="+key+"---"+numPartitions+"----"+Utils.nonNegativeMod(key.hashCode,
numPartitions))
The Executor logs has
16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---4----2
16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---4----2
........
How to make sure, the executors are picking the right partitioner.
*Code:*
package org.apache.spark
class ExactPartitioner(partitions: Int) extends Partitioner with Logging{
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
* log.info <http://log.info>("ExactPartitioner="+key)*
key match{
case "INFO" => 0
case "DEBUG" => 1
case "ERROR" => 2
case "WARN" => 3
case "FATAL" => 4
}
}
}
object GroupByCLDB {
def main(args: Array[String]) {
val logFile = "/DATA"
val sparkConf = new SparkConf().setAppName("GroupBy")
sparkConf.set("spark.executor.memory","4g");
sparkConf.set("spark.executor.cores","2");
sparkConf.set("spark.executor.instances","2");
val sc = new SparkContext(sparkConf)
val logData = sc.textFile(logFile)
case class LogClass(one:String,two:String)
def parse(line: String) = {
val pieces = line.split(' ')
val level = pieces(2).toString
val one = pieces(0).toString
val two = pieces(1).toString
(level,LogClass(one,two))
}
val output = logData.map(x => parse(x))
*val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val
groups = partitioned.groupByKey(new ExactPartitioner(5))*
groups.count()
output.partitions.size
partitioned.partitions.size
}
}
Thanks,
Prabhu Joseph