Create a key and join on that.
val callPricesByHour = callPrices.map(p => ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c => ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =>
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values
You should be able to expand this approach to three RDDs too.
On Tue, Apr 29, 2014 at 11:55 AM, jsantos <[email protected]> wrote:
> In the context of telecom industry, let's supose we have several existing
> RDDs populated from some tables in Cassandra:
>
> val callPrices: RDD[PriceRow]
> val calls: RDD[CallRow]
> val offersInCourse: RDD[OfferRow]
>
> where types are defined as follows,
>
> /** Represents the price per minute for a concrete hour */
> case class PriceRow(
> val year: Int,
> val month: Int,
> val day: Int,
> val hour: Int,
> val basePrice: Float)
>
> /** Call registries*/
> case class CallRow(
> val customer: String,
> val year: Int,
> val month: Int,
> val day: Int,
> val minutes: Int)
>
> /** Is there any discount that could be applicable here? */
> case class OfferRow(
> val offerName: String,
> val hour: Int,//[0..23]
> val discount: Float)//[0..1]
>
> Assuming we cannot use `flatMap` to mix these three RDDs like this way
> (since RDD is not really 'monadic'):
>
> /**
> * The final bill at a concrete hour for a call
> * is defined as {{{
> * def billPerHour(minutes: Int,basePrice:Float,discount:Float)
> =
> * minutes * basePrice * discount
> * }}}
> */
> val bills: RDD[BillRow] = for{
> price <- callPrices
> call <- calls if call.hour==price.hour
> offer <- offersInCourse if offer.hour==price.hour
> } yield BillRow(
> call.customer,
> call.hour,
> billPerHour(call.minutes,price.basePrice,offer.discount))
>
> case class BillRow(
> val customer: String,
> val hour: DateTime,
> val amount: Float)
>
> which is the best practise for generating a new RDD that join all these
> three RDDs and represents the bill for a concrete customer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>