I have to do the following tasks on a dataset using Apache Spark with Scala as
the programming language:
- Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
- Group the data by device id. Thus we now have a map of deviceid =>
(bytes,eventdate)
- For each device, sort the set by eventdate. We now have an ordered set of
bytes based on eventdate for each device.
- Pick the last 30 days of bytes from this ordered set.
- Find the moving average of bytes for the last date using a time period of
30.
- Find the standard deviation of the bytes for the final date using a time
period of 30.
- Return two values in the result (mean - kstddev) and (mean + kstddev)
[Assume k = 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run
on a billion rows finally.Here is the data structure for the dataset.package
com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
) extends Ordered[DailyDeviceAggregates] {
def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
}
}
object DeviceAggregates {
def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
}
}The DeviceAnalyzer class looks like this:I have a very crude implementation
that does the job, but it is not up to the mark. Sorry, I am very new to
Scala/Spark, so my questions are quite basic. Here is what I have now:
import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.util.Sorting
object DeviceAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)
val logFile = args(0)
val deviceAggregateLogs =
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
deviceIdsMap.foreach(a => {
val device_id = a._1 // This is the device ID
val allaggregates = a._2 // This is an array of all device-aggregates
for this device
println(allaggregates)
Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of
DailyDeviceAggregates based on eventdate
println(allaggregates) // This does not work - results are not sorted !!
val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
val count = byteValues.count(A => true)
val sum = byteValues.sum
val xbar = sum / count
val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
val vector: Vector = Vectors.dense(byteValues)
println(vector)
println(device_id + "," + xbar + "," + stddev)
//val vector: Vector = Vectors.dense(byteValues)
//println(vector)
//val summary: MultivariateStatisticalSummary =
Statistics.colStats(vector)
})
sc.stop()
}
}I would really appreciate if someone can suggests improvements for the
following:
- The call to Sorting.quicksort is not working. Perhaps I am calling it the
wrong way.
- I would like to use the Spark mllib class MultivariateStatisticalSummary
to calculate the statistical values.
- For that I would need to keep all my intermediate values as RDD so that I
can directly use the RDD methods to do the job.
- At the end I also need to write the results to HDFS for which there is a
method provided on the RDD class to do so, which is another reason I would like
to retain everything as RDD.
Thanks in advance for your help.
Anupam BagchiÂ