I have to do the following tasks on a dataset using Apache Spark with Scala as
the programming language:
Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid =>
(bytes,eventdate)
For each device, sort the set by eventdate. We now have an ordered set of bytes
based on eventdate for each device.
Pick the last 30 days of bytes from this ordered set.
Find the moving average of bytes for the last date using a time period of 30.
Find the standard deviation of the bytes for the final date using a time period
of 30.
Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k
= 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run
on a billion rows finally.
Here is the data structure for the dataset.
package com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
) extends Ordered[DailyDeviceAggregates] {
def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
}
}
object DeviceAggregates {
def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
}
}
The DeviceAnalyzer class looks like this:
I have a very crude implementation that does the job, but it is not up to the
mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic.
Here is what I have now:
import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.util.Sorting
object DeviceAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)
val logFile = args(0)
val deviceAggregateLogs =
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
deviceIdsMap.foreach(a => {
val device_id = a._1 // This is the device ID
val allaggregates = a._2 // This is an array of all device-aggregates
for this device
println(allaggregates)
Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of
DailyDeviceAggregates based on eventdate
println(allaggregates) // This does not work - results are not sorted !!
val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
val count = byteValues.count(A => true)
val sum = byteValues.sum
val xbar = sum / count
val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
val vector: Vector = Vectors.dense(byteValues)
println(vector)
println(device_id + "," + xbar + "," + stddev)
//val vector: Vector = Vectors.dense(byteValues)
//println(vector)
//val summary: MultivariateStatisticalSummary =
Statistics.colStats(vector)
})
sc.stop()
}
}
I would really appreciate if someone can suggests improvements for the
following:
The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong
way.
I would like to use the Spark mllib class MultivariateStatisticalSummary to
calculate the statistical values.
For that I would need to keep all my intermediate values as RDD so that I can
directly use the RDD methods to do the job.
At the end I also need to write the results to HDFS for which there is a method
provided on the RDD class to do so, which is another reason I would like to
retain everything as RDD.
Thanks in advance for your help.
Anupam Bagchi