You should be able to achieve what you're looking for by using foldByKey to
find the latest record for each key. If you're relying on the order
elements within the file to determine which ones are the "latest" (rather
than sorting by some field within the file itself), call zipWithIndex first
to give each element a numeric index that you can use for comparisons.

For example (type annotations are unnecessary but included for clarity):
val parsedRecords : RDD[(Key, Value)] = ???
val indexedRecords : RDD[(Key, (Value, Int))] = parsedRecords.zipWithIndex
map {case ((k, v), n) => k -> (v,n)}
val latestRecords : RDD[(Key, Value)] = indexedRecords foldByKey(null) {(a,
b) => (a,b) match {
   case (null, _) => b
   case ((_, an), (_, bn)) if an < bn => b
   case _ => a
} mapValues {case (v, _) => v}

You can then write "latestRecords" out to a file however you like. Note
that I would recommend using string interpolation or the CSV output format
(for DataFrames) over that string replacement you are currently using to
format your output.

On Sat, Oct 10, 2015 at 1:11 PM, Kali <kali.tumm...@gmail.com> wrote:

> Hi Richard,
>
> Requirement is to get latest records using a key i think hash map is a
> good choice for this task.
> As of now data comes from third party and we are not sure what's the
> latest record is so hash map is chosen.
> Is there anything better than hash map please let me know.
>
> Thanks
> Sri
>
> Sent from my iPad
>
> On 10 Oct 2015, at 17:10, Richard Eggert <richard.egg...@gmail.com> wrote:
>
> Do you need the HashMap for anything else besides writing out to a file?
> If not, there is really no need to create one at all.  You could just keep
> everything as RDDs.
> On Oct 10, 2015 11:31 AM, "kali.tumm...@gmail.com" <kali.tumm...@gmail.com>
> wrote:
>
>> Got it ..., created hashmap and saved it to file please follow below
>> steps ..
>>
>>  val QuoteRDD=quotefile.map(x => x.split("\\|")).
>>       filter(line => line(0).contains("1017")).
>>       map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) ,
>>       if (x(15) =="B")
>>         (
>>           {if (x(25) == "") x(9)  else x(25)},
>>          {if (x(37) == "") x(11) else x(37)}
>>         )
>>       else if (x(15) =="C" )
>>         (
>>            {if (x(24) == "") (x(9))  else x(24)},
>>            {if (x(30) == "") (x(11)) else x(30)}
>>         )
>>       else if (x(15) =="A")
>>          {(x(9),x(11))}
>>         )))
>>
>>
>>     val QuoteHashMap=QuoteRDD.collect().toMap
>>     val test=QuoteHashMap.values.toSeq
>>     val test2=sc.parallelize(test.map(x =>
>> x.toString.replace("(","").replace(")","")))
>>
>> test2.saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\mkdata\\test.txt")
>>     test2.collect().foreach(println)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25014.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
Rich

Reply via email to