Hi Sampo,
There is a sliding method you could try inside the org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi stuff (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions) import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.rdd.RDDFunctions._ object Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() sparkConf.setMaster("local").setAppName("sandbox") val sc = new SparkContext(sparkConf) val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E")) val rdd = sc.parallelize(arr) val sorted = rdd.sortByKey(true) print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq) sc.stop() } } prints WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E))) Otherwise you could try to convert your RDD to a DataFrame then use windowing functions in SparkSQL with the LEAD/LAG functions. Best regards, Fanilo De : Dylan Hogg [mailto:dylanh...@gmail.com] Envoyé : jeudi 22 octobre 2015 13:44 À : Sampo Niskanen Cc : user Objet : Re: Analyzing consecutive elements Hi Sampo, You could try zipWithIndex followed by a self join with shifted index values like this: val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E")) val rdd = sc.parallelize(arr) val sorted = rdd.sortByKey(true) val zipped = sorted.zipWithIndex.map(x => (x._2, x._1)) val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1) Which produces the consecutive elements as pairs in the RDD for further processing: (0,((1,A),(3,B))) (1,((3,B),(7,C))) (2,((7,C),(8,D))) (3,((8,D),(9,E))) There are probably more efficient ways to do this, but if your dataset isn't too big it should work for you. Cheers, Dylan. On 22 October 2015 at 17:35, Sampo Niskanen <sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>> wrote: Hi, I have analytics data with timestamps on each element. I'd like to analyze consecutive elements using Spark, but haven't figured out how to do this. Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to an RDD [(A,B), (B,C), (C,D), (D,E)]. (Or some other way to analyze time-related elements.) How can this be achieved? Sampo Niskanen Lead developer / Wellmo sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com> +358 40 820 5291 ________________________________ Ce message et les pièces jointes sont confidentiels et réservés à l'usage exclusif de ses destinataires. Il peut également être protégé par le secret professionnel. Si vous recevez ce message par erreur, merci d'en avertir immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra être recherchée quant au contenu de ce message. Bien que les meilleurs efforts soient faits pour maintenir cette transmission exempte de tout virus, l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne saurait être recherchée pour tout dommage résultant d'un virus transmis. This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, the Worldline liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted.