Thanks Feynman for your direction.
I was able to solve this problem by calling Spark API from Java.
Here is a code snippet that may help other people who might face the same
challenge.
if (args.length > 2) {
earliestEventDate = Integer.parseInt(args[2]);
} else {
Date now = Calendar.getInstance().getTime();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
earliestEventDate = Integer.parseInt(dateFormat.format(new
Date(now.getTime()-30L*AnalyticsConstants.ONE_DAY_IN_MILLISECONDS)));
}
System.out.println("Filtering out dates earlier than: " +
earliestEventDate);
JavaRDD<String> logLines = sc.textFile(inputFile);
// Convert the text log lines to DailyDeviceAggregates objects and
cache them
JavaRDD<DailyDeviceAggregates> accessLogs =
logLines.map(Functions.PARSE_DEVICE_AGGREGATE_LINE).filter(new
Function<DailyDeviceAggregates, Boolean>() {
@Override
public Boolean call(DailyDeviceAggregates value) {
return (value.getEventdate() >= earliestEventDate);
}
}).cache();
// accessLogs.saveAsTextFile("accessLogs.saved");
JavaPairRDD<Object, Iterable<DailyDeviceAggregates>> groupMap =
accessLogs.groupBy(new Function<DailyDeviceAggregates, Object>() {
@Override
public Object call(DailyDeviceAggregates agg) throws Exception {
return agg.getDevice_id();
}
});
// groupMap.saveAsTextFile("groupedAccessLogs.saved");
JavaPairRDD<Object, DailyDeviceSummary> deviceCharacteristics =
groupMap.mapValues(new Function<Iterable<DailyDeviceAggregates>,
DailyDeviceSummary>() {
@Override
public DailyDeviceSummary call(Iterable<DailyDeviceAggregates>
allDeviceDataForMonth) throws Exception {
// First task is to sort the input values by eventdate
ArrayList<DailyDeviceAggregates> arr = new
ArrayList<DailyDeviceAggregates>();
for (DailyDeviceAggregates agg: allDeviceDataForMonth) {
arr.add(agg);
}
Collections.sort(arr);
// Done sorting
double bytesTransferred[] = new double[arr.size()];
double bytesIn[] = new double[arr.size()];
double bytesOut[] = new double[arr.size()];
DailyDeviceAggregates lastAggregate = null;
int index = 0;
for (DailyDeviceAggregates aggregate : arr) {
// System.out.println(aggregate);
bytesIn[index] = aggregate.getBytes_in();
bytesOut[index] = aggregate.getBytes_out();
bytesTransferred[index] = aggregate.getBytes_transferred();
index++;
lastAggregate = aggregate;
}
BollingerBands bollingerBytesTransferrred = new
BollingerBands(bytesTransferred, 30, 2.0);
BollingerBands bollingerBytesIn = new BollingerBands(bytesIn,
30, 2.0);
BollingerBands bollingerBytesOut = new BollingerBands(bytesOut,
30, 2.0);
return new DailyDeviceSummary(lastAggregate.getAccount_id(),
lastAggregate.getDevice_id(), index,
bollingerBytesIn.getLastMean(),
bollingerBytesOut.getLastMean(), bollingerBytesTransferrred.getLastMean(),
bollingerBytesIn.getLastStandardDeviation(),
bollingerBytesOut.getLastStandardDeviation(),
bollingerBytesTransferrred.getLastStandardDeviation(),
(long)bollingerBytesIn.getLastUpperThreshold(),
(long)bollingerBytesOut.getLastUpperThreshold(),
(long)bollingerBytesTransferrred.getLastUpperThreshold(),
(long)bollingerBytesIn.getLastLowerThreshold(),
(long)bollingerBytesOut.getLastLowerThreshold(),
(long)bollingerBytesTransferrred.getLastLowerThreshold());
}
});
deviceCharacteristics.values().saveAsTextFile(outputFile);
Anupam Bagchi
> On Jul 14, 2015, at 10:21 AM, Feynman Liang <[email protected]> wrote:
>
> If your rows may have NAs in them, I would process each column individually
> by first projecting the column ( map(x => x.nameOfColumn) ), filtering out
> the NAs, then running a summarizer over each column.
>
> Even if you have many rows, after summarizing you will only have a vector of
> length #columns.
>
> On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <[email protected]
> <mailto:[email protected]>> wrote:
> Hello Feynman,
>
> Actually in my case, the vectors I am summarizing over will not have the same
> dimension since many devices will be inactive on some days. This is at best a
> sparse matrix where we take only the active days and attempt to fit a moving
> average over it.
>
> The reason I would like to save it to HDFS is that there are really several
> million (almost a billion) devices for which this data needs to be written. I
> am perhaps writing a very few columns, but the number of rows is pretty large.
>
> Given the above two cases, is using MultivariateOnlineSummarizer not a good
> idea then?
>
> Anupam Bagchi
>
>
>> On Jul 13, 2015, at 7:06 PM, Feynman Liang <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>>
>> Make sure all the vectors you are summarizing over have the same dimension.
>>
>> Why would you want to write a MultivariateOnlineSummary object (which can be
>> represented with a couple Double's) into a distributed filesystem like HDFS?
>>
>> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <[email protected]
>> <mailto:[email protected]>> wrote:
>> Thank you Feynman for the lead.
>>
>> I was able to modify the code using clues from the RegressionMetrics
>> example. Here is what I got now.
>>
>> val deviceAggregateLogs =
>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>> // Calculate statistics based on bytes-transferred
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> println(deviceIdsMap.collect().deep.mkString("\n"))
>>
>> val summary: MultivariateStatisticalSummary = {
>> val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>> case (deviceId, allaggregates) => Vectors.dense({
>> val sortedAggregates = allaggregates.toArray
>> Sorting.quickSort(sortedAggregates)
>> sortedAggregates.map(dda => dda.bytes.toDouble)
>> })
>> }.aggregate(new MultivariateOnlineSummarizer())(
>> (summary, v) => summary.add(v), // Not sure if this is really what I
>> want, it just came from the example
>> (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>> )
>> summary
>> }
>> It compiles fine. But I am now getting an exception as follows at Runtime.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
>> to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure:
>> Lost task 1.0 in stage 3.0 (TID 5, localhost):
>> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
>> when adding new sample. Expecting 8 but got 14.
>> at scala.Predef$.require(Predef.scala:233)
>> at
>> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>> at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>> at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>> at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>> at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Can’t tell where exactly I went wrong. Also, how do I take the
>> MultivariateOnlineSummary object and write it to HDFS? I have the
>> MultivariateOnlineSummary object with me, but I really need an RDD to call
>> saveAsTextFile() on it.
>>
>> Anupam Bagchi
>>
>>
>>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> A good example is RegressionMetrics
>>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
>>> use of of OnlineMultivariateSummarizer to aggregate statistics across
>>> labels and residuals; take a look at how aggregateByKey is used there.
>>>
>>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi
>>> <[email protected] <mailto:[email protected]>> wrote:
>>> Thank you Feynman for your response. Since I am very new to Scala I may
>>> need a bit more hand-holding at this stage.
>>>
>>> I have been able to incorporate your suggestion about sorting - and it now
>>> works perfectly. Thanks again for that.
>>>
>>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
>>> could not proceed further. For each deviceid (the key) my goal is to get a
>>> vector of doubles on which I can query the mean and standard deviation. Now
>>> because RDDs are immutable, I cannot use a foreach loop to interate through
>>> the groupby results and individually add the values in an RDD - Spark does
>>> not allow that. I need to apply the RDD functions directly on the entire
>>> set to achieve the transformations I need. This is where I am faltering
>>> since I am not used to the lambda expressions that Scala uses.
>>>
>>> object DeviceAnalyzer {
>>> def main(args: Array[String]) {
>>> val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>> val sc = new SparkContext(sparkConf)
>>>
>>> val logFile = args(0)
>>>
>>> val deviceAggregateLogs =
>>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>
>>> // Calculate statistics based on bytes
>>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>> // Question: Can we not write the line above as
>>> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) //
>>> Anything wrong?
>>> // All I need to do below is collect the vector of bytes for each
>>> device and store it in the RDD
>>> // The problem with the ‘foreach' approach below, is that it generates
>>> the vector values one at a time, which I cannot
>>> // add individually to an immutable RDD
>>> deviceIdsMap.foreach(a => {
>>> val device_id = a._1 // This is the device ID
>>> val allaggregates = a._2 // This is an array of all
>>> device-aggregates for this device
>>>
>>> val sortedaggregates = allaggregates.toArray
>>> Sorting.quickSort(sortedaggregates)
>>>
>>> val byteValues = sortedaggregates.map(dda =>
>>> dda.bytes.toDouble).toArray
>>> val count = byteValues.count(A => true)
>>> val sum = byteValues.sum
>>> val xbar = sum / count
>>> val sum_x_minus_x_bar_square = byteValues.map(x =>
>>> (x-xbar)*(x-xbar)).sum
>>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>
>>> val vector: Vector = Vectors.dense(byteValues)
>>> println(vector)
>>> println(device_id + "," + xbar + "," + stddev)
>>>
>>> })
>>> //val vector: Vector = Vectors.dense(byteValues)
>>> //println(vector)
>>> //val summary: MultivariateStatisticalSummary =
>>> Statistics.colStats(vector)
>>>
>>>
>>> sc.stop()
>>> }
>>> }
>>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
>>> Thanks a lot for your help.
>>>
>>> Anupam Bagchi
>>>
>>>
>>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>>>> wrong way.
>>>> allaggregates.toArray allocates and creates a new array separate from
>>>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>>>> val sortedAggregates = allaggregates.toArray
>>>> Sorting.quickSort(sortedAggregates)
>>>> I would like to use the Spark mllib class MultivariateStatisticalSummary
>>>> to calculate the statistical values.
>>>> MultivariateStatisticalSummary is a trait (similar to a Java interface);
>>>> you probably want to use MultivariateOnlineSummarizer.
>>>> For that I would need to keep all my intermediate values as RDD so that I
>>>> can directly use the RDD methods to do the job.
>>>> Correct; you would do an aggregate using the add and merge functions
>>>> provided by MultivariateOnlineSummarizer
>>>> At the end I also need to write the results to HDFS for which there is a
>>>> method provided on the RDD class to do so, which is another reason I would
>>>> like to retain everything as RDD.
>>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
>>>> or you could unpack the relevant statistics from
>>>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
>>>> and then write.
>>>>
>>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi
>>>> <[email protected] <mailto:[email protected]>> wrote:
>>>> I have to do the following tasks on a dataset using Apache Spark with
>>>> Scala as the programming language:
>>>> Read the dataset from HDFS. A few sample lines look like this:
>>>> deviceid,bytes,eventdate
>>>> 15590657,246620,20150630
>>>> 14066921,1907,20150621
>>>> 14066921,1906,20150626
>>>> 6522013,2349,20150626
>>>> 6522013,2525,20150613
>>>> Group the data by device id. Thus we now have a map of deviceid =>
>>>> (bytes,eventdate)
>>>> For each device, sort the set by eventdate. We now have an ordered set of
>>>> bytes based on eventdate for each device.
>>>> Pick the last 30 days of bytes from this ordered set.
>>>> Find the moving average of bytes for the last date using a time period of
>>>> 30.
>>>> Find the standard deviation of the bytes for the final date using a time
>>>> period of 30.
>>>> Return two values in the result (mean - kstddev) and (mean + kstddev)
>>>> [Assume k = 3]
>>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
>>>> run on a billion rows finally.
>>>> Here is the data structure for the dataset.
>>>> package com.testing
>>>> case class DeviceAggregates (
>>>> device_id: Integer,
>>>> bytes: Long,
>>>> eventdate: Integer
>>>> ) extends Ordered[DailyDeviceAggregates] {
>>>> def compare(that: DailyDeviceAggregates): Int = {
>>>> eventdate - that.eventdate
>>>> }
>>>> }
>>>> object DeviceAggregates {
>>>> def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>> val c = logline.split(",")
>>>> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>> }
>>>> }
>>>> The DeviceAnalyzer class looks like this:
>>>> I have a very crude implementation that does the job, but it is not up to
>>>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>>>> basic. Here is what I have now:
>>>>
>>>> import com.testing.DailyDeviceAggregates
>>>> import org.apache.spark.{SparkContext, SparkConf}
>>>> import org.apache.spark.mllib.linalg.Vector
>>>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
>>>> Statistics}
>>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>>
>>>> import scala.util.Sorting
>>>>
>>>> object DeviceAnalyzer {
>>>> def main(args: Array[String]) {
>>>> val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>> val sc = new SparkContext(sparkConf)
>>>>
>>>> val logFile = args(0)
>>>>
>>>> val deviceAggregateLogs =
>>>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>>
>>>> // Calculate statistics based on bytes
>>>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>>
>>>> deviceIdsMap.foreach(a => {
>>>> val device_id = a._1 // This is the device ID
>>>> val allaggregates = a._2 // This is an array of all
>>>> device-aggregates for this device
>>>>
>>>> println(allaggregates)
>>>> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer
>>>> of DailyDeviceAggregates based on eventdate
>>>> println(allaggregates) // This does not work - results are not
>>>> sorted !!
>>>>
>>>> val byteValues = allaggregates.map(dda =>
>>>> dda.bytes.toDouble).toArray
>>>> val count = byteValues.count(A => true)
>>>> val sum = byteValues.sum
>>>> val xbar = sum / count
>>>> val sum_x_minus_x_bar_square = byteValues.map(x =>
>>>> (x-xbar)*(x-xbar)).sum
>>>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>>
>>>> val vector: Vector = Vectors.dense(byteValues)
>>>> println(vector)
>>>> println(device_id + "," + xbar + "," + stddev)
>>>>
>>>> //val vector: Vector = Vectors.dense(byteValues)
>>>> //println(vector)
>>>> //val summary: MultivariateStatisticalSummary =
>>>> Statistics.colStats(vector)
>>>> })
>>>>
>>>> sc.stop()
>>>> }
>>>> }
>>>> I would really appreciate if someone can suggests improvements for the
>>>> following:
>>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>>>> wrong way.
>>>> I would like to use the Spark mllib class MultivariateStatisticalSummary
>>>> to calculate the statistical values.
>>>> For that I would need to keep all my intermediate values as RDD so that I
>>>> can directly use the RDD methods to do the job.
>>>> At the end I also need to write the results to HDFS for which there is a
>>>> method provided on the RDD class to do so, which is another reason I would
>>>> like to retain everything as RDD.
>>>>
>>>> Thanks in advance for your help.
>>>>
>>>> Anupam Bagchi
>>>>
>>>>
>>>
>>>
>>
>>
>
>