It worked when I converted the nested RDD to an array

------------------------------
case class TradingTier(tierId:String, lowerLimit:Int,upperLimit:Int ,
transactionFees:Double) 
    
   //userTransactions Seq[(accountId,numTransactions)] 
   val userTransactionsRDD =
sc.parallelize(Seq((id1,2),(id2,4),(id3,1),(id4,3))) 

   val transactionTiersRDD =  
sc.parallelize(Seq(TradingTier("tier1",0,2,9.00),TradingTier("tier2",1,4,3.00),TradingTier("tier3",3,5,5.00)))
 

  * val transactionTiersArray =   transactionTiersRDD.toArray*

val userTransactionFees = userTransactionsRDD.flatMap( x => 
        transactionTiersArray.filter(y=>(y.upperLimit > x._2 && y.lowerLimit
< x._2)).map(_.transactionFees)) 
   val aggregateTransactionTiers2 =
userTransactionsRDD.zip(userTransactionFees) 
------------------------------




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/any-work-around-to-support-nesting-of-RDDs-other-than-join-tp3816p3820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to