Thank you Feynman for your response. Since I am very new to Scala I may need a
bit more hand-holding at this stage.
I have been able to incorporate your suggestion about sorting - and it now
works perfectly. Thanks again for that.
I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could
not proceed further. For each deviceid (the key) my goal is to get a vector of
doubles on which I can query the mean and standard deviation. Now because RDDs
are immutable, I cannot use a foreach loop to interate through the groupby
results and individually add the values in an RDD - Spark does not allow that.
I need to apply the RDD functions directly on the entire set to achieve the
transformations I need. This is where I am faltering since I am not used to the
lambda expressions that Scala uses.
object DeviceAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)
val logFile = args(0)
val deviceAggregateLogs =
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
// Question: Can we not write the line above as
deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything
wrong?
// All I need to do below is collect the vector of bytes for each device
and store it in the RDD
// The problem with the ‘foreach' approach below, is that it generates the
vector values one at a time, which I cannot
// add individually to an immutable RDD
deviceIdsMap.foreach(a => {
val device_id = a._1 // This is the device ID
val allaggregates = a._2 // This is an array of all device-aggregates
for this device
val sortedaggregates = allaggregates.toArray
Sorting.quickSort(sortedaggregates)
val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
val count = byteValues.count(A => true)
val sum = byteValues.sum
val xbar = sum / count
val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
val vector: Vector = Vectors.dense(byteValues)
println(vector)
println(device_id + "," + xbar + "," + stddev)
})
//val vector: Vector = Vectors.dense(byteValues)
//println(vector)
//val summary: MultivariateStatisticalSummary =
Statistics.colStats(vector)
sc.stop()
}
}
Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks
a lot for your help.
Anupam Bagchi
> On Jul 13, 2015, at 12:21 PM, Feynman Liang <[email protected]> wrote:
>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
> wrong way.
> allaggregates.toArray allocates and creates a new array separate from
> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
> I would like to use the Spark mllib class MultivariateStatisticalSummary to
> calculate the statistical values.
> MultivariateStatisticalSummary is a trait (similar to a Java interface); you
> probably want to use MultivariateOnlineSummarizer.
> For that I would need to keep all my intermediate values as RDD so that I can
> directly use the RDD methods to do the job.
> Correct; you would do an aggregate using the add and merge functions provided
> by MultivariateOnlineSummarizer
> At the end I also need to write the results to HDFS for which there is a
> method provided on the RDD class to do so, which is another reason I would
> like to retain everything as RDD.
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or
> you could unpack the relevant statistics from MultivariateOnlineSummarizer
> into an array/tuple using a mapValues first and then write.
>
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <[email protected]
> <mailto:[email protected]>> wrote:
> I have to do the following tasks on a dataset using Apache Spark with Scala
> as the programming language:
> Read the dataset from HDFS. A few sample lines look like this:
> deviceid,bytes,eventdate
> 15590657,246620,20150630
> 14066921,1907,20150621
> 14066921,1906,20150626
> 6522013,2349,20150626
> 6522013,2525,20150613
> Group the data by device id. Thus we now have a map of deviceid =>
> (bytes,eventdate)
> For each device, sort the set by eventdate. We now have an ordered set of
> bytes based on eventdate for each device.
> Pick the last 30 days of bytes from this ordered set.
> Find the moving average of bytes for the last date using a time period of 30.
> Find the standard deviation of the bytes for the final date using a time
> period of 30.
> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume
> k = 3]
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run
> on a billion rows finally.
> Here is the data structure for the dataset.
> package com.testing
> case class DeviceAggregates (
> device_id: Integer,
> bytes: Long,
> eventdate: Integer
> ) extends Ordered[DailyDeviceAggregates] {
> def compare(that: DailyDeviceAggregates): Int = {
> eventdate - that.eventdate
> }
> }
> object DeviceAggregates {
> def parseLogLine(logline: String): DailyDeviceAggregates = {
> val c = logline.split(",")
> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
> }
> }
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to the
> mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic.
> Here is what I have now:
>
> import com.testing.DailyDeviceAggregates
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.mllib.linalg.Vector
> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
> Statistics}
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>
> import scala.util.Sorting
>
> object DeviceAnalyzer {
> def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Device Analyzer")
> val sc = new SparkContext(sparkConf)
>
> val logFile = args(0)
>
> val deviceAggregateLogs =
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
> deviceIdsMap.foreach(a => {
> val device_id = a._1 // This is the device ID
> val allaggregates = a._2 // This is an array of all device-aggregates
> for this device
>
> println(allaggregates)
> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of
> DailyDeviceAggregates based on eventdate
> println(allaggregates) // This does not work - results are not sorted !!
>
> val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
> val count = byteValues.count(A => true)
> val sum = byteValues.sum
> val xbar = sum / count
> val sum_x_minus_x_bar_square = byteValues.map(x =>
> (x-xbar)*(x-xbar)).sum
> val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
> val vector: Vector = Vectors.dense(byteValues)
> println(vector)
> println(device_id + "," + xbar + "," + stddev)
>
> //val vector: Vector = Vectors.dense(byteValues)
> //println(vector)
> //val summary: MultivariateStatisticalSummary =
> Statistics.colStats(vector)
> })
>
> sc.stop()
> }
> }
> I would really appreciate if someone can suggests improvements for the
> following:
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
> wrong way.
> I would like to use the Spark mllib class MultivariateStatisticalSummary to
> calculate the statistical values.
> For that I would need to keep all my intermediate values as RDD so that I can
> directly use the RDD methods to do the job.
> At the end I also need to write the results to HDFS for which there is a
> method provided on the RDD class to do so, which is another reason I would
> like to retain everything as RDD.
>
> Thanks in advance for your help.
>
> Anupam Bagchi
>
>