Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.
I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that. I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses. object DeviceAnalyzer { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Device Analyzer") val sc = new SparkContext(sparkConf) val logFile = args(0) val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() // Calculate statistics based on bytes val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong? // All I need to do below is collect the vector of bytes for each device and store it in the RDD // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot // add individually to an immutable RDD deviceIdsMap.foreach(a => { val device_id = a._1 // This is the device ID val allaggregates = a._2 // This is an array of all device-aggregates for this device val sortedaggregates = allaggregates.toArray Sorting.quickSort(sortedaggregates) val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray val count = byteValues.count(A => true) val sum = byteValues.sum val xbar = sum / count val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum val stddev = math.sqrt(sum_x_minus_x_bar_square / count) val vector: Vector = Vectors.dense(byteValues) println(vector) println(device_id + "," + xbar + "," + stddev) }) //val vector: Vector = Vectors.dense(byteValues) //println(vector) //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector) sc.stop() } } Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help. Anupam Bagchi > On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com> wrote: > > The call to Sorting.quicksort is not working. Perhaps I am calling it the > wrong way. > allaggregates.toArray allocates and creates a new array separate from > allaggregates which is sorted by Sorting.quickSort; allaggregates. Try: > val sortedAggregates = allaggregates.toArray > Sorting.quickSort(sortedAggregates) > I would like to use the Spark mllib class MultivariateStatisticalSummary to > calculate the statistical values. > MultivariateStatisticalSummary is a trait (similar to a Java interface); you > probably want to use MultivariateOnlineSummarizer. > For that I would need to keep all my intermediate values as RDD so that I can > directly use the RDD methods to do the job. > Correct; you would do an aggregate using the add and merge functions provided > by MultivariateOnlineSummarizer > At the end I also need to write the results to HDFS for which there is a > method provided on the RDD class to do so, which is another reason I would > like to retain everything as RDD. > You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or > you could unpack the relevant statistics from MultivariateOnlineSummarizer > into an array/tuple using a mapValues first and then write. > > On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bag...@rocketmail.com > <mailto:anupam_bag...@rocketmail.com>> wrote: > I have to do the following tasks on a dataset using Apache Spark with Scala > as the programming language: > Read the dataset from HDFS. A few sample lines look like this: > deviceid,bytes,eventdate > 15590657,246620,20150630 > 14066921,1907,20150621 > 14066921,1906,20150626 > 6522013,2349,20150626 > 6522013,2525,20150613 > Group the data by device id. Thus we now have a map of deviceid => > (bytes,eventdate) > For each device, sort the set by eventdate. We now have an ordered set of > bytes based on eventdate for each device. > Pick the last 30 days of bytes from this ordered set. > Find the moving average of bytes for the last date using a time period of 30. > Find the standard deviation of the bytes for the final date using a time > period of 30. > Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume > k = 3] > I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run > on a billion rows finally. > Here is the data structure for the dataset. > package com.testing > case class DeviceAggregates ( > device_id: Integer, > bytes: Long, > eventdate: Integer > ) extends Ordered[DailyDeviceAggregates] { > def compare(that: DailyDeviceAggregates): Int = { > eventdate - that.eventdate > } > } > object DeviceAggregates { > def parseLogLine(logline: String): DailyDeviceAggregates = { > val c = logline.split(",") > DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) > } > } > The DeviceAnalyzer class looks like this: > I have a very crude implementation that does the job, but it is not up to the > mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. > Here is what I have now: > > import com.testing.DailyDeviceAggregates > import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.mllib.linalg.Vector > import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, > Statistics} > import org.apache.spark.mllib.linalg.{Vector, Vectors} > > import scala.util.Sorting > > object DeviceAnalyzer { > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("Device Analyzer") > val sc = new SparkContext(sparkConf) > > val logFile = args(0) > > val deviceAggregateLogs = > sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() > > // Calculate statistics based on bytes > val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) > > deviceIdsMap.foreach(a => { > val device_id = a._1 // This is the device ID > val allaggregates = a._2 // This is an array of all device-aggregates > for this device > > println(allaggregates) > Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of > DailyDeviceAggregates based on eventdate > println(allaggregates) // This does not work - results are not sorted !! > > val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray > val count = byteValues.count(A => true) > val sum = byteValues.sum > val xbar = sum / count > val sum_x_minus_x_bar_square = byteValues.map(x => > (x-xbar)*(x-xbar)).sum > val stddev = math.sqrt(sum_x_minus_x_bar_square / count) > > val vector: Vector = Vectors.dense(byteValues) > println(vector) > println(device_id + "," + xbar + "," + stddev) > > //val vector: Vector = Vectors.dense(byteValues) > //println(vector) > //val summary: MultivariateStatisticalSummary = > Statistics.colStats(vector) > }) > > sc.stop() > } > } > I would really appreciate if someone can suggests improvements for the > following: > The call to Sorting.quicksort is not working. Perhaps I am calling it the > wrong way. > I would like to use the Spark mllib class MultivariateStatisticalSummary to > calculate the statistical values. > For that I would need to keep all my intermediate values as RDD so that I can > directly use the RDD methods to do the job. > At the end I also need to write the results to HDFS for which there is a > method provided on the RDD class to do so, which is another reason I would > like to retain everything as RDD. > > Thanks in advance for your help. > > Anupam Bagchi > >