In general working out minimum or max of say prices (I do not know your use case) is pretty straight forward.
For example val maxValue = price.reduceByWindow((x:Double,y:Double) => if(x > y) x else y,Seconds(windowLength), Seconds(slidingInterval)) maxValue.print() The average values are running totals. You can possible try // lines.foreachRDD { rdd => val test = rdd.map(_.split(',').view(2)).map(_.toDouble) val c = test.count val s = test.sum val r = s/c rdd.foreach { record => } } HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 6 May 2016 at 16:28, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Matthias, > > Say with the following > > you have > > "Batch interval" is the basic interval at which the system with receive > the data in batches. > val ssc = new StreamingContext(sparkConf, Seconds(n)) > // window length - The duration of the window below that must be multiple > of batch interval n in = > StreamingContext(sparkConf, Seconds(n)) > val windowLength = x > // sliding interval - The interval at which the window operation is > performed in other words data is collected within this "previous interval x" > val slidingInterval = y > > OK so you want to use something like below to get the average value within > y interval for a given parameter? The logic may be incorrect below > > val countByValueAndWindow = RS.filter(_ > AVG(VALUE)).countByValueAndWindow(Seconds(windowLength), > Seconds(slidingInterval)) > > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > 2016-05-06 15:54 GMT+01:00 Matthias Niehoff < > matthias.nieh...@codecentric.de>: > >> Hi, >> >> If i want to have a sliding average over the 10 minutes for some keys I >> can do something like >> groupBy(window(…),“my-key“).avg(“some-values“) in Spark 2.0 >> >> I try to implement this sliding average using Spark 1.6.x: >> I tried with reduceByKeyAndWindow but it did not find a solution. Imo i >> have to keep all the values in the window to compute the average. One way >> would be add every new value to a list in the reduce method and then to the >> avg computation in a separate map, but this seems kind of ugly. >> >> Do you have an idea how to solve this? >> >> Thanks! >> >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> 172.1702676 >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >> E-Mail ist nicht gestattet >> > >