Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
have next results for k-means:
Number of iterations= 10, number of elements = 10000000, mahouttime= 602,
spark time = 138
Number of iterations= 40, number of elements = 10000000, mahouttime= 1917,
spark time = 330
Number of iterations= 70, number of elements = 10000000, mahouttime= 3203,
spark time = 388
Number of iterations= 10, number of elements = 100000000, mahouttime= 1235,
spark time = 2226
Number of iterations= 40, number of elements = 100000000, mahouttime= 2755,
spark time = 6388
Number of iterations= 70, number of elements = 100000000, mahouttime= 4107,
spark time = 10967
Number of iterations= 10, number of elements = 1000000000, mahouttime=
7070, spark time = 25268
Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
for clusterization are randomly created. When I changed persistence level
from Memory to Memory_and_disk, on big data spark started to work faster.
What am I missing?
See my benchmarking code in attachment.
--
*Sincerely yoursEgor PakhomovScala Developer, Yandex*
package ru.yandex.spark.examples
import scala.util.Random
import scala.collection.mutable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import ru.yandex.spark.benchmark.Job
import org.apache.mahout.common.distance.EuclideanDistanceMeasure
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{LoggerFactory, Logger}
import org.apache.spark.storage.StorageLevel
object KMeansBenchMark {
private final val log: Logger = LoggerFactory.getLogger(this.getClass)
val benchPath: Path = new Path("/tmp/benchmark")
val inputDataPath: Path = new Path("/tmp/benchmark/testdata")
val outputDataPath: Path = new Path("/tmp/benchmark/output")
val configuration = new Configuration()
val fs = FileSystem.get(FileSystem.getDefaultUri(configuration), configuration)
def main(args: Array[String]) {
type MahoutTime = Long
type SparkTime = Long
type NumberOfIterations = Int
type NumberOfElements = Long
val result = new mutable.MutableList[(NumberOfIterations, NumberOfElements, MahoutTime, SparkTime)]
System.setProperty("SPARK_YARN_APP_JAR", SparkContext.jarOfClass(this.getClass).head)
System.setProperty("SPARK_JAR", SparkContext.jarOfClass(SparkContext.getClass).head)
System.setProperty("spark.driver.port", "49014")
val conf = new SparkConf()
conf.setAppName("serp-api")
conf.setMaster("yarn-client")
conf.set("spark.httpBroadcast.port", "35660")
conf.set("spark.fileserver.port", "35661")
conf.setJars(SparkContext.jarOfClass(this.getClass))
val numbers = List(10000000L, 100000000L, 1000000000L, 1000000000L)
for (numberOfElements: NumberOfElements <- numbers) {
for (numberOfIterations: NumberOfIterations <- 10 until 80 by 30) {
println(s"------------------------------------- ${numberOfElements} ${numberOfIterations}")
prepareData(numberOfElements)
val sparkStart = System.currentTimeMillis()
val spark = new SparkContext(conf)
val input = spark.textFile(inputDataPath.toString).map(s => s.split(" ").map(number => number.toDouble)).persist(StorageLevel.DISK_ONLY)
KMeans.train(input, 10, numberOfIterations, 1, KMeans.RANDOM).clusterCenters
spark.stop()
val sparkEnd = System.currentTimeMillis()
val mahaoutStart = System.currentTimeMillis()
Job.run(configuration, inputDataPath, outputDataPath, new EuclideanDistanceMeasure, 10, 0.5, numberOfIterations)
val mahaoutEnd = System.currentTimeMillis()
val mahaoutTime: MahoutTime = (mahaoutEnd - mahaoutStart) / 1000
val sparkTime: SparkTime = (sparkEnd - sparkStart) / 1000
result += ((numberOfIterations, numberOfElements, mahaoutTime, sparkTime))
for (i <- result) {
log.info(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
}
for (i <- result) {
println(s"Number of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4}")
}
}
}
}
def prepareData(numberOfElements: Long) = {
fs.delete(benchPath, true)
fs.mkdirs(benchPath)
val output = fs.create(inputDataPath)
for (i <- 0L until numberOfElements) {
output.writeBytes(nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + " " + nextRandom + "\n")
}
output.close()
}
def nextRandom = {
Random.nextGaussian() * 10e5 - Random.nextInt(10) * 10e4
}
}