You should be able to achieve what you're looking for by using foldByKey to find the latest record for each key. If you're relying on the order elements within the file to determine which ones are the "latest" (rather than sorting by some field within the file itself), call zipWithIndex first to give each element a numeric index that you can use for comparisons.
For example (type annotations are unnecessary but included for clarity): val parsedRecords : RDD[(Key, Value)] = ??? val indexedRecords : RDD[(Key, (Value, Int))] = parsedRecords.zipWithIndex map {case ((k, v), n) => k -> (v,n)} val latestRecords : RDD[(Key, Value)] = indexedRecords foldByKey(null) {(a, b) => (a,b) match { case (null, _) => b case ((_, an), (_, bn)) if an < bn => b case _ => a } mapValues {case (v, _) => v} You can then write "latestRecords" out to a file however you like. Note that I would recommend using string interpolation or the CSV output format (for DataFrames) over that string replacement you are currently using to format your output. On Sat, Oct 10, 2015 at 1:11 PM, Kali <kali.tumm...@gmail.com> wrote: > Hi Richard, > > Requirement is to get latest records using a key i think hash map is a > good choice for this task. > As of now data comes from third party and we are not sure what's the > latest record is so hash map is chosen. > Is there anything better than hash map please let me know. > > Thanks > Sri > > Sent from my iPad > > On 10 Oct 2015, at 17:10, Richard Eggert <richard.egg...@gmail.com> wrote: > > Do you need the HashMap for anything else besides writing out to a file? > If not, there is really no need to create one at all. You could just keep > everything as RDDs. > On Oct 10, 2015 11:31 AM, "kali.tumm...@gmail.com" <kali.tumm...@gmail.com> > wrote: > >> Got it ..., created hashmap and saved it to file please follow below >> steps .. >> >> val QuoteRDD=quotefile.map(x => x.split("\\|")). >> filter(line => line(0).contains("1017")). >> map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) , >> if (x(15) =="B") >> ( >> {if (x(25) == "") x(9) else x(25)}, >> {if (x(37) == "") x(11) else x(37)} >> ) >> else if (x(15) =="C" ) >> ( >> {if (x(24) == "") (x(9)) else x(24)}, >> {if (x(30) == "") (x(11)) else x(30)} >> ) >> else if (x(15) =="A") >> {(x(9),x(11))} >> ))) >> >> >> val QuoteHashMap=QuoteRDD.collect().toMap >> val test=QuoteHashMap.values.toSeq >> val test2=sc.parallelize(test.map(x => >> x.toString.replace("(","").replace(")",""))) >> >> test2.saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\mkdata\\test.txt") >> test2.collect().foreach(println) >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25014.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- Rich