Thank you Feynman for your response. Since I am very new to Scala I may need a 
bit more hand-holding at this stage.

I have been able to incorporate your suggestion about sorting - and it now 
works perfectly. Thanks again for that.

I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could 
not proceed further. For each deviceid (the key) my goal is to get a vector of 
doubles on which I can query the mean and standard deviation. Now because RDDs 
are immutable, I cannot use a foreach loop to interate through the groupby 
results and individually add the values in an RDD - Spark does not allow that. 
I need to apply the RDD functions directly on the entire set to achieve the 
transformations I need. This is where I am faltering since I am not used to the 
lambda expressions that Scala uses.

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    // Question: Can we not write the line above as 
deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything 
wrong?
    // All I need to do below is collect the vector of bytes for each device 
and store it in the RDD
    // The problem with the ‘foreach' approach below, is that it generates the 
vector values one at a time, which I cannot 
    // add individually to an immutable RDD
    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

      val sortedaggregates = allaggregates.toArray
      Sorting.quickSort(sortedaggregates)

      val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

    })
      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)


    sc.stop()
  }
}
Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks 
a lot for your help.

Anupam Bagchi


> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com> wrote:
> 
> The call to Sorting.quicksort is not working. Perhaps I am calling it the 
> wrong way.
> allaggregates.toArray allocates and creates a new array separate from 
> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
> I would like to use the Spark mllib class MultivariateStatisticalSummary to 
> calculate the statistical values.
> MultivariateStatisticalSummary is a trait (similar to a Java interface); you 
> probably want to use MultivariateOnlineSummarizer. 
> For that I would need to keep all my intermediate values as RDD so that I can 
> directly use the RDD methods to do the job.
> Correct; you would do an aggregate using the add and merge functions provided 
> by MultivariateOnlineSummarizer 
> At the end I also need to write the results to HDFS for which there is a 
> method provided on the RDD class to do so, which is another reason I would 
> like to retain everything as RDD.
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or 
> you could unpack the relevant statistics from MultivariateOnlineSummarizer 
> into an array/tuple using a mapValues first and then write.
> 
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bag...@rocketmail.com 
> <mailto:anupam_bag...@rocketmail.com>> wrote:
> I have to do the following tasks on a dataset using Apache Spark with Scala 
> as the programming language:
> Read the dataset from HDFS. A few sample lines look like this:
> deviceid,bytes,eventdate
> 15590657,246620,20150630
> 14066921,1907,20150621
> 14066921,1906,20150626
> 6522013,2349,20150626
> 6522013,2525,20150613
> Group the data by device id. Thus we now have a map of deviceid => 
> (bytes,eventdate)
> For each device, sort the set by eventdate. We now have an ordered set of 
> bytes based on eventdate for each device.
> Pick the last 30 days of bytes from this ordered set.
> Find the moving average of bytes for the last date using a time period of 30.
> Find the standard deviation of the bytes for the final date using a time 
> period of 30.
> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume 
> k = 3]
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
> on a billion rows finally.
> Here is the data structure for the dataset.
> package com.testing
> case class DeviceAggregates (
>                         device_id: Integer,
>                         bytes: Long,
>                         eventdate: Integer
>                    ) extends Ordered[DailyDeviceAggregates] {
>   def compare(that: DailyDeviceAggregates): Int = {
>     eventdate - that.eventdate
>   }
> }
> object DeviceAggregates {
>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>     val c = logline.split(",")
>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>   }
> }
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to the 
> mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. 
> Here is what I have now:
> 
> import com.testing.DailyDeviceAggregates
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.mllib.linalg.Vector
> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
> Statistics}
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
> 
> import scala.util.Sorting
> 
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
> 
>     val logFile = args(0)
> 
>     val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> 
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates 
> for this device
> 
>       println(allaggregates)
>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
> DailyDeviceAggregates based on eventdate
>       println(allaggregates) // This does not work - results are not sorted !!
> 
>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => 
> (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
> 
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
> 
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = 
> Statistics.colStats(vector)
>     })
> 
>     sc.stop()
>   }
> }
> I would really appreciate if someone can suggests improvements for the 
> following:
> The call to Sorting.quicksort is not working. Perhaps I am calling it the 
> wrong way.
> I would like to use the Spark mllib class MultivariateStatisticalSummary to 
> calculate the statistical values.
> For that I would need to keep all my intermediate values as RDD so that I can 
> directly use the RDD methods to do the job.
> At the end I also need to write the results to HDFS for which there is a 
> method provided on the RDD class to do so, which is another reason I would 
> like to retain everything as RDD.
> 
> Thanks in advance for your help.
> 
> Anupam Bagchi
>  
> 

Reply via email to