Hi

Sorry for not replying ealier. I've been on vacation.
NW in the previous message refers to Network, so the data in question is
basically an aggregate of packet traces. We have two columns, one for sent
bytes during that last millisecond and one for received bytes during the
same time period.

As I described earlier we wanted to do a quick check if there was any
correlation between received and sent bytes for different lags between the
columns.

I eventually parallellized this computation using Spark. Mllib has a
somewhat new function called sliding which can be applied to an RDD with a
given window size (in our case 1000). I then use an accumulator with an
array for each of the 1000 correlations, i.e. no map/reduce phases, only one
foreach.

I think the sliding function would be a nice addition to Flink as it will
allow many algorithms for time series analysis.

Snippet from the code:
// Calculate the denominator and numerator parts of the pearson correlation
formula
val corr_numerator = sc.accumulator(new Array[Double](window_size))
val y1_adjsum = sc.accumulator(0.0)
val y2_adjsum = sc.accumulator(0.0)
data.sliding(window_size).foreach(w => {
  corr_numerator += w.map(v => (w(0)(y1_idx) - y1_mean) * (v(y2_idx) -
y2_mean))
  y1_adjsum += math.pow(w(0)(y1_idx) - y1_mean, 2)
  y2_adjsum += math.pow(w(0)(y2_idx) - y2_mean, 2)
})

// Calculate the pearson correlation for each element in the sliding window
val corr_denom = math.pow(y1_adjsum.value * y2_adjsum.value, 0.5)
val res = corr_numerator.value.map(x => x / corr_denom)


Best regards,
Stefan




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-External-Talk-Apache-Flink-Speakers-Kostas-Tzoumas-CEO-dataArtisans-Stephan-Ewen-CTO-dataArtisan-tp955p1055.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to