Create a key and join on that.

val callPricesByHour = callPrices.map(p => ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c => ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =>
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values

You should be able to expand this approach to three RDDs too.


On Tue, Apr 29, 2014 at 11:55 AM, jsantos <[email protected]> wrote:

> In the context of telecom industry, let's supose we have several existing
> RDDs populated from some tables in Cassandra:
>
>         val callPrices: RDD[PriceRow]
>         val calls: RDD[CallRow]
>         val offersInCourse: RDD[OfferRow]
>
> where types are defined as follows,
>
>         /** Represents the price per minute for a concrete hour */
>         case class PriceRow(
>                 val year: Int,
>                 val month: Int,
>                 val day: Int,
>                 val hour: Int,
>                 val basePrice: Float)
>
>         /** Call registries*/
>         case class CallRow(
>                 val customer: String,
>                 val year: Int,
>                 val month: Int,
>                 val day: Int,
>                 val minutes: Int)
>
>         /** Is there any discount that could be applicable here? */
>         case class OfferRow(
>                 val offerName: String,
>                 val hour: Int,//[0..23]
>                 val discount: Float)//[0..1]
>
> Assuming we cannot use `flatMap` to mix these three RDDs like this way
> (since RDD is not really 'monadic'):
>
>         /**
>          * The final bill at a concrete hour for a call
>          * is defined as {{{
>          *    def billPerHour(minutes: Int,basePrice:Float,discount:Float)
> =
>          *              minutes * basePrice * discount
>          * }}}
>          */
>         val bills: RDD[BillRow] = for{
>                 price <- callPrices
>                 call <- calls if call.hour==price.hour
>                 offer <- offersInCourse if offer.hour==price.hour
>         } yield BillRow(
>                 call.customer,
>                 call.hour,
>                 billPerHour(call.minutes,price.basePrice,offer.discount))
>
>         case class BillRow(
>                 val customer: String,
>                 val hour: DateTime,
>                 val amount: Float)
>
> which is the best practise for generating a new RDD that join all these
> three RDDs and represents the bill for a concrete customer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to