Thank you Feynman for the lead.
I was able to modify the code using clues from the RegressionMetrics example.
Here is what I got now.
val deviceAggregateLogs =
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
// Calculate statistics based on bytes-transferred
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
println(deviceIdsMap.collect().deep.mkString("\n"))
val summary: MultivariateStatisticalSummary = {
val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
case (deviceId, allaggregates) => Vectors.dense({
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)
sortedAggregates.map(dda => dda.bytes.toDouble)
})
}.aggregate(new MultivariateOnlineSummarizer())(
(summary, v) => summary.add(v), // Not sure if this is really what I
want, it just came from the example
(sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
)
summary
}
It compiles fine. But I am now getting an exception as follows at Runtime.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to
stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost
task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException:
requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but
got 14.
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
at
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
at
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
at
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
at
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Can’t tell where exactly I went wrong. Also, how do I take the
MultivariateOnlineSummary object and write it to HDFS? I have the
MultivariateOnlineSummary object with me, but I really need an RDD to call
saveAsTextFile() on it.
Anupam Bagchi
(c) 408.431.0780 (h) 408-873-7909
> On Jul 13, 2015, at 4:52 PM, Feynman Liang <[email protected]> wrote:
>
> A good example is RegressionMetrics
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
> use of of OnlineMultivariateSummarizer to aggregate statistics across labels
> and residuals; take a look at how aggregateByKey is used there.
>
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <[email protected]
> <mailto:[email protected]>> wrote:
> Thank you Feynman for your response. Since I am very new to Scala I may need
> a bit more hand-holding at this stage.
>
> I have been able to incorporate your suggestion about sorting - and it now
> works perfectly. Thanks again for that.
>
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
> could not proceed further. For each deviceid (the key) my goal is to get a
> vector of doubles on which I can query the mean and standard deviation. Now
> because RDDs are immutable, I cannot use a foreach loop to interate through
> the groupby results and individually add the values in an RDD - Spark does
> not allow that. I need to apply the RDD functions directly on the entire set
> to achieve the transformations I need. This is where I am faltering since I
> am not used to the lambda expressions that Scala uses.
>
> object DeviceAnalyzer {
> def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Device Analyzer")
> val sc = new SparkContext(sparkConf)
>
> val logFile = args(0)
>
> val deviceAggregateLogs =
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> // Question: Can we not write the line above as
> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything
> wrong?
> // All I need to do below is collect the vector of bytes for each device
> and store it in the RDD
> // The problem with the ‘foreach' approach below, is that it generates
> the vector values one at a time, which I cannot
> // add individually to an immutable RDD
> deviceIdsMap.foreach(a => {
> val device_id = a._1 // This is the device ID
> val allaggregates = a._2 // This is an array of all device-aggregates
> for this device
>
> val sortedaggregates = allaggregates.toArray
> Sorting.quickSort(sortedaggregates)
>
> val byteValues = sortedaggregates.map(dda =>
> dda.bytes.toDouble).toArray
> val count = byteValues.count(A => true)
> val sum = byteValues.sum
> val xbar = sum / count
> val sum_x_minus_x_bar_square = byteValues.map(x =>
> (x-xbar)*(x-xbar)).sum
> val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
> val vector: Vector = Vectors.dense(byteValues)
> println(vector)
> println(device_id + "," + xbar + "," + stddev)
>
> })
> //val vector: Vector = Vectors.dense(byteValues)
> //println(vector)
> //val summary: MultivariateStatisticalSummary =
> Statistics.colStats(vector)
>
>
> sc.stop()
> }
> }
> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
> Thanks a lot for your help.
>
> Anupam Bagchi
>
>
>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>> wrong way.
>> allaggregates.toArray allocates and creates a new array separate from
>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>> val sortedAggregates = allaggregates.toArray
>> Sorting.quickSort(sortedAggregates)
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to
>> calculate the statistical values.
>> MultivariateStatisticalSummary is a trait (similar to a Java interface); you
>> probably want to use MultivariateOnlineSummarizer.
>> For that I would need to keep all my intermediate values as RDD so that I
>> can directly use the RDD methods to do the job.
>> Correct; you would do an aggregate using the add and merge functions
>> provided by MultivariateOnlineSummarizer
>> At the end I also need to write the results to HDFS for which there is a
>> method provided on the RDD class to do so, which is another reason I would
>> like to retain everything as RDD.
>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or
>> you could unpack the relevant statistics from MultivariateOnlineSummarizer
>> into an array/tuple using a mapValues first and then write.
>>
>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi
>> <[email protected] <mailto:[email protected]>> wrote:
>> I have to do the following tasks on a dataset using Apache Spark with Scala
>> as the programming language:
>> Read the dataset from HDFS. A few sample lines look like this:
>> deviceid,bytes,eventdate
>> 15590657,246620,20150630
>> 14066921,1907,20150621
>> 14066921,1906,20150626
>> 6522013,2349,20150626
>> 6522013,2525,20150613
>> Group the data by device id. Thus we now have a map of deviceid =>
>> (bytes,eventdate)
>> For each device, sort the set by eventdate. We now have an ordered set of
>> bytes based on eventdate for each device.
>> Pick the last 30 days of bytes from this ordered set.
>> Find the moving average of bytes for the last date using a time period of 30.
>> Find the standard deviation of the bytes for the final date using a time
>> period of 30.
>> Return two values in the result (mean - kstddev) and (mean + kstddev)
>> [Assume k = 3]
>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
>> run on a billion rows finally.
>> Here is the data structure for the dataset.
>> package com.testing
>> case class DeviceAggregates (
>> device_id: Integer,
>> bytes: Long,
>> eventdate: Integer
>> ) extends Ordered[DailyDeviceAggregates] {
>> def compare(that: DailyDeviceAggregates): Int = {
>> eventdate - that.eventdate
>> }
>> }
>> object DeviceAggregates {
>> def parseLogLine(logline: String): DailyDeviceAggregates = {
>> val c = logline.split(",")
>> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>> }
>> }
>> The DeviceAnalyzer class looks like this:
>> I have a very crude implementation that does the job, but it is not up to
>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>> basic. Here is what I have now:
>>
>> import com.testing.DailyDeviceAggregates
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.mllib.linalg.Vector
>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
>> Statistics}
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>
>> import scala.util.Sorting
>>
>> object DeviceAnalyzer {
>> def main(args: Array[String]) {
>> val sparkConf = new SparkConf().setAppName("Device Analyzer")
>> val sc = new SparkContext(sparkConf)
>>
>> val logFile = args(0)
>>
>> val deviceAggregateLogs =
>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>> // Calculate statistics based on bytes
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>
>> deviceIdsMap.foreach(a => {
>> val device_id = a._1 // This is the device ID
>> val allaggregates = a._2 // This is an array of all device-aggregates
>> for this device
>>
>> println(allaggregates)
>> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of
>> DailyDeviceAggregates based on eventdate
>> println(allaggregates) // This does not work - results are not sorted
>> !!
>>
>> val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>> val count = byteValues.count(A => true)
>> val sum = byteValues.sum
>> val xbar = sum / count
>> val sum_x_minus_x_bar_square = byteValues.map(x =>
>> (x-xbar)*(x-xbar)).sum
>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>
>> val vector: Vector = Vectors.dense(byteValues)
>> println(vector)
>> println(device_id + "," + xbar + "," + stddev)
>>
>> //val vector: Vector = Vectors.dense(byteValues)
>> //println(vector)
>> //val summary: MultivariateStatisticalSummary =
>> Statistics.colStats(vector)
>> })
>>
>> sc.stop()
>> }
>> }
>> I would really appreciate if someone can suggests improvements for the
>> following:
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>> wrong way.
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to
>> calculate the statistical values.
>> For that I would need to keep all my intermediate values as RDD so that I
>> can directly use the RDD methods to do the job.
>> At the end I also need to write the results to HDFS for which there is a
>> method provided on the RDD class to do so, which is another reason I would
>> like to retain everything as RDD.
>>
>> Thanks in advance for your help.
>>
>> Anupam Bagchi
>>
>>
>
>