Hi Sorry for not replying ealier. I've been on vacation. NW in the previous message refers to Network, so the data in question is basically an aggregate of packet traces. We have two columns, one for sent bytes during that last millisecond and one for received bytes during the same time period.
As I described earlier we wanted to do a quick check if there was any correlation between received and sent bytes for different lags between the columns. I eventually parallellized this computation using Spark. Mllib has a somewhat new function called sliding which can be applied to an RDD with a given window size (in our case 1000). I then use an accumulator with an array for each of the 1000 correlations, i.e. no map/reduce phases, only one foreach. I think the sliding function would be a nice addition to Flink as it will allow many algorithms for time series analysis. Snippet from the code: // Calculate the denominator and numerator parts of the pearson correlation formula val corr_numerator = sc.accumulator(new Array[Double](window_size)) val y1_adjsum = sc.accumulator(0.0) val y2_adjsum = sc.accumulator(0.0) data.sliding(window_size).foreach(w => { corr_numerator += w.map(v => (w(0)(y1_idx) - y1_mean) * (v(y2_idx) - y2_mean)) y1_adjsum += math.pow(w(0)(y1_idx) - y1_mean, 2) y2_adjsum += math.pow(w(0)(y2_idx) - y2_mean, 2) }) // Calculate the pearson correlation for each element in the sliding window val corr_denom = math.pow(y1_adjsum.value * y2_adjsum.value, 0.5) val res = corr_numerator.value.map(x => x / corr_denom) Best regards, Stefan -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-External-Talk-Apache-Flink-Speakers-Kostas-Tzoumas-CEO-dataArtisans-Stephan-Ewen-CTO-dataArtisan-tp955p1055.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.