Thank you Feynman for the lead.

I was able to modify the code using clues from the RegressionMetrics example. 
Here is what I got now.

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes-transferred
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
println(deviceIdsMap.collect().deep.mkString("\n"))

val summary: MultivariateStatisticalSummary = {
  val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
    case (deviceId, allaggregates) => Vectors.dense({
      val sortedAggregates = allaggregates.toArray
      Sorting.quickSort(sortedAggregates)
      sortedAggregates.map(dda => dda.bytes.toDouble)
    })
  }.aggregate(new MultivariateOnlineSummarizer())(
      (summary, v) => summary.add(v),  // Not sure if this is really what I 
want, it just came from the example
      (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
    )
  summary
}
It compiles fine. But I am now getting an exception as follows at Runtime.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost 
task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: 
requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but 
got 14.
        at scala.Predef$.require(Predef.scala:233)
        at 
org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
        at 
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at 
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at 
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at 
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Can’t tell where exactly I went wrong. Also, how do I take the 
MultivariateOnlineSummary object and write it to HDFS? I have the 
MultivariateOnlineSummary object with me, but I really need an RDD to call 
saveAsTextFile() on it.

Anupam Bagchi
(c) 408.431.0780 (h) 408-873-7909

> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fli...@databricks.com> wrote:
> 
> A good example is RegressionMetrics 
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
>  use of of OnlineMultivariateSummarizer to aggregate statistics across labels 
> and residuals; take a look at how aggregateByKey is used there.
> 
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bag...@rocketmail.com 
> <mailto:anupam_bag...@rocketmail.com>> wrote:
> Thank you Feynman for your response. Since I am very new to Scala I may need 
> a bit more hand-holding at this stage.
> 
> I have been able to incorporate your suggestion about sorting - and it now 
> works perfectly. Thanks again for that.
> 
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but 
> could not proceed further. For each deviceid (the key) my goal is to get a 
> vector of doubles on which I can query the mean and standard deviation. Now 
> because RDDs are immutable, I cannot use a foreach loop to interate through 
> the groupby results and individually add the values in an RDD - Spark does 
> not allow that. I need to apply the RDD functions directly on the entire set 
> to achieve the transformations I need. This is where I am faltering since I 
> am not used to the lambda expressions that Scala uses.
> 
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
> 
>     val logFile = args(0)
> 
>     val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>     // Question: Can we not write the line above as 
> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything 
> wrong?
>     // All I need to do below is collect the vector of bytes for each device 
> and store it in the RDD
>     // The problem with the ‘foreach' approach below, is that it generates 
> the vector values one at a time, which I cannot 
>     // add individually to an immutable RDD
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates 
> for this device
> 
>       val sortedaggregates = allaggregates.toArray
>       Sorting.quickSort(sortedaggregates)
> 
>       val byteValues = sortedaggregates.map(dda => 
> dda.bytes.toDouble).toArray 
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => 
> (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
> 
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
> 
>     })
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = 
> Statistics.colStats(vector)
> 
> 
>     sc.stop()
>   }
> }
> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? 
> Thanks a lot for your help.
> 
> Anupam Bagchi
> 
> 
>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com 
>> <mailto:fli...@databricks.com>> wrote:
>> 
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the 
>> wrong way.
>> allaggregates.toArray allocates and creates a new array separate from 
>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>> val sortedAggregates = allaggregates.toArray
>> Sorting.quickSort(sortedAggregates)
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to 
>> calculate the statistical values.
>> MultivariateStatisticalSummary is a trait (similar to a Java interface); you 
>> probably want to use MultivariateOnlineSummarizer. 
>> For that I would need to keep all my intermediate values as RDD so that I 
>> can directly use the RDD methods to do the job.
>> Correct; you would do an aggregate using the add and merge functions 
>> provided by MultivariateOnlineSummarizer 
>> At the end I also need to write the results to HDFS for which there is a 
>> method provided on the RDD class to do so, which is another reason I would 
>> like to retain everything as RDD.
>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or 
>> you could unpack the relevant statistics from MultivariateOnlineSummarizer 
>> into an array/tuple using a mapValues first and then write.
>> 
>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi 
>> <anupam_bag...@rocketmail.com <mailto:anupam_bag...@rocketmail.com>> wrote:
>> I have to do the following tasks on a dataset using Apache Spark with Scala 
>> as the programming language:
>> Read the dataset from HDFS. A few sample lines look like this:
>> deviceid,bytes,eventdate
>> 15590657,246620,20150630
>> 14066921,1907,20150621
>> 14066921,1906,20150626
>> 6522013,2349,20150626
>> 6522013,2525,20150613
>> Group the data by device id. Thus we now have a map of deviceid => 
>> (bytes,eventdate)
>> For each device, sort the set by eventdate. We now have an ordered set of 
>> bytes based on eventdate for each device.
>> Pick the last 30 days of bytes from this ordered set.
>> Find the moving average of bytes for the last date using a time period of 30.
>> Find the standard deviation of the bytes for the final date using a time 
>> period of 30.
>> Return two values in the result (mean - kstddev) and (mean + kstddev) 
>> [Assume k = 3]
>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to 
>> run on a billion rows finally.
>> Here is the data structure for the dataset.
>> package com.testing
>> case class DeviceAggregates (
>>                         device_id: Integer,
>>                         bytes: Long,
>>                         eventdate: Integer
>>                    ) extends Ordered[DailyDeviceAggregates] {
>>   def compare(that: DailyDeviceAggregates): Int = {
>>     eventdate - that.eventdate
>>   }
>> }
>> object DeviceAggregates {
>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>     val c = logline.split(",")
>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>   }
>> }
>> The DeviceAnalyzer class looks like this:
>> I have a very crude implementation that does the job, but it is not up to 
>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite 
>> basic. Here is what I have now:
>> 
>> import com.testing.DailyDeviceAggregates
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.mllib.linalg.Vector
>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
>> Statistics}
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>> 
>> import scala.util.Sorting
>> 
>> object DeviceAnalyzer {
>>   def main(args: Array[String]) {
>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>     val sc = new SparkContext(sparkConf)
>> 
>>     val logFile = args(0)
>> 
>>     val deviceAggregateLogs = 
>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>> 
>>     // Calculate statistics based on bytes
>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> 
>>     deviceIdsMap.foreach(a => {
>>       val device_id = a._1  // This is the device ID
>>       val allaggregates = a._2  // This is an array of all device-aggregates 
>> for this device
>> 
>>       println(allaggregates)
>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
>> DailyDeviceAggregates based on eventdate
>>       println(allaggregates) // This does not work - results are not sorted 
>> !!
>> 
>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>>       val count = byteValues.count(A => true)
>>       val sum = byteValues.sum
>>       val xbar = sum / count
>>       val sum_x_minus_x_bar_square = byteValues.map(x => 
>> (x-xbar)*(x-xbar)).sum
>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>> 
>>       val vector: Vector = Vectors.dense(byteValues)
>>       println(vector)
>>       println(device_id + "," + xbar + "," + stddev)
>> 
>>       //val vector: Vector = Vectors.dense(byteValues)
>>       //println(vector)
>>       //val summary: MultivariateStatisticalSummary = 
>> Statistics.colStats(vector)
>>     })
>> 
>>     sc.stop()
>>   }
>> }
>> I would really appreciate if someone can suggests improvements for the 
>> following:
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the 
>> wrong way.
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to 
>> calculate the statistical values.
>> For that I would need to keep all my intermediate values as RDD so that I 
>> can directly use the RDD methods to do the job.
>> At the end I also need to write the results to HDFS for which there is a 
>> method provided on the RDD class to do so, which is another reason I would 
>> like to retain everything as RDD.
>> 
>> Thanks in advance for your help.
>> 
>> Anupam Bagchi
>>  
>> 
> 
> 

Reply via email to