I did this
val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg)
}.collectAsMap
val broadCastMap = sc.broadcast(lstgItemMap)
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.mapPartitions({
iter =>
val lstgItemMap = broadCastMap.value
for {
(itemId, viDetail) <- iter
if (lstgItemMap.contains(itemId))
} yield ({
val listing = lstgItemMap.get(itemId).get
val viSummary = new VISummary
viSummary.leafCategoryId = listing.getLeafCategId().toInt
viSummary.itemSiteId = listing.getItemSiteId().toInt
viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
viSummary.sellerCountryId = listing.getSlrCntryId().toInt
viSummary.buyerSegment = "0"
viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
val sellerId = listing.getSlrId.toLong
(sellerId, (viDetail, viSummary, itemId))
})
})
Earlier :
val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
val viEventsWithListings: RDD[(Long, (DetailInputRecord,
VISummary, Long))] = lstgItem.join(viEvents).map {
case (itemId, (listing, viDetail)) =>
val viSummary = new VISummary
viSummary.leafCategoryId = listing.getLeafCategId().toInt
viSummary.itemSiteId = listing.getItemSiteId().toInt
viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
viSummary.sellerCountryId = listing.getSlrCntryId().toInt
viSummary.buyerSegment = "0"
viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
val sellerId = listing.getSlrId.toLong
(sellerId, (viDetail, viSummary, itemId))
}
Waiting for run to complete.
On Tue, Apr 21, 2015 at 9:54 AM, Punyashloka Biswal <[email protected]>
wrote:
> Could you do it using flatMap?
>
> Punya
>
> On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
> wrote:
>
>> The reason am asking this is, i am not able to understand how do i do a
>> skip.
>>
>> 1) Broadcast small table-1 as map.
>> 2) I jun do .map() on large table-2.
>> When you do .map() you must map each element to a new element.
>> However with map-side join, when i get the broadcasted map, i will
>> search in it with a key, and if that element in not found in map then i
>> want to skip that input all together. (This is what happens when you do
>> .join, it skips automatically). With map side join you need to do it. I am
>> assuming you do it with mapPartitions & yield.
>>
>> A working code will help me understand it better.
>>
>> On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
>> wrote:
>>
>>> Can someone share their working code of Map Side join in Spark + Scala.
>>> (No Spark-SQL)
>>>
>>> The only resource i could find was this (Open in chrome with Chinese to
>>> english translator)
>>>
>>> http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
--
Deepak