Hi Daniel, I understood Sébastien was talking having having a high number of keys, I guess I was prejudiced by my own problem! :) Anyway I don't think you need to use disk or a database to generate a RDD per key, you can use filter which I guess would be more efficient because IO is avoided, especially if the RDD was cached. For example:
// in the spark shell import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD._ import scala.reflect.ClassTag // generate a map from key to rdd of values def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = { val keys = pairRDD.keys.distinct.collect (for (k <- keys) yield k -> (pairRDD filter(_._1 == k) values) ) toMap } // simple demo val xs = sc.parallelize(1 to 1000) val ixs = xs map(x => (x % 10, x)) val gs = groupByKeyToRDDs(ixs) gs(1).collect Just an idea. Greetings, Juan Rodriguez 2015-04-29 14:20 GMT+02:00 Daniel Darabos <daniel.dara...@lynxanalytics.com> : > Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method > for saving the RDD into separate files by key in a single pass. Then you > can read the files into separate RDDs. > > On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi Sébastien, >> >> I came with a similar problem some time ago, you can see the discussion in >> the Spark users mailing list at >> >> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results >> . My experience was that when you create too many RDDs the Spark scheduler >> gets stuck, so if you have many keys in the map you are creating you'll >> probably have problems. On the other hand, the latest example I proposed >> in >> that mailing thread was a batch job in which we start from a single RDD of >> time tagged data, transform the RDD in a list of RDD corresponding to >> generating windows according to the time tag of the records, and then >> apply >> a transformation of RDD to each window RDD, like for example KMeans.run of >> MLlib. This is very similar to what you propose. >> So in my humble opinion the approach of generating thousands of RDDs by >> filtering doesn't work, and a new RDD class should be implemented for >> this. >> I have never implemented a custom RDD, but if you want some help I would >> be >> happy to join you in this task >> > > Sebastien said nothing about thousands of keys. This is a valid problem > even if you only have two different keys. > > Greetings, >> >> Juan >> >> >> >> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère <s.sou...@gmail.com >> >: >> >> > Hello, >> > >> > I'm facing a problem with custom RDD transformations. >> > >> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a >> map >> > of RDD by key. >> > >> > This would be great, for example, in order to process mllib clustering >> on V >> > values grouped by K. >> > >> > I know I could do it using filter() on my RDD as many times I have keys, >> > but I'm afraid this would not be efficient (the entire RDD would be read >> > each time, right ?). Then, I could mapByPartition my RDD before >> filtering, >> > but the code is finally huge... >> > >> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, >> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time >> only, >> > but I cannot achieve my development. >> > >> > Please, could you tell me first if this is really faisable, and then, >> could >> > you give me some pointers ? >> > >> > Thank you, >> > Regards, >> > Sebastien >> > >> > >