To do it in one pass, conceptually what you would need to do is to consume the entire parent iterator and store the values either in memory or on disk, which is generally something you want to avoid given that the parent iterator length is unbounded. If you need to start spilling to disk, you might actually get better performance just from doing multiple passes, provided that you don't have that many unique keys. In fact, the filter approach that you mentioned earlier is conceptually the same as the implementation of randomSplit, where each of the split RDDs has access to the full parent RDD then does the sample.
In addition, building the map is actually very cheap. Since its lazy, you only do the filters when you need to iterate across the rdd of a specific key. On Wed, Apr 29, 2015 at 9:57 AM Sébastien Soubré-Lanabère < s.sou...@gmail.com> wrote: > Hi Juan, Daniel, > > thank you for your explanations. Indeed, I don't have a big number of keys, > at least not enough to stuck the scheduler. > > I was using a method quite similar as what you post, Juan, and yes it > works, but I think this would be more efficient to not call filter on each > key. So, I was thinking something like : > - get the iterator of the KV rdd > - distribute each value into a subset by key and then recreate a rdd from > this subset > > Because spark context parallelize method cannot be used inside a > transformation, I wonder if I could do it by creating a custom RDD and then > try to implement something like PairRDDFunctions.lookup method, but > remplacing Seq[V] of course by a RDD > > def lookup(key: K): Seq[V] = { > self.partitioner match { > case Some(p) => > val index = p.getPartition(key) > val process = (it: Iterator[(K, V)]) => { > val buf = new ArrayBuffer[V] > for (pair <- it if pair._1 == key) { > buf += pair._2 > } > buf > } : Seq[V] > val res = self.context.runJob(self, process, Array(index), false) > res(0) > case None => > self.filter(_._1 == key).map(_._2).collect() > } > } > > > 2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com>: > > > Hi Daniel, > > > > I understood Sébastien was talking having having a high number of keys, I > > guess I was prejudiced by my own problem! :) Anyway I don't think you > need > > to use disk or a database to generate a RDD per key, you can use filter > > which I guess would be more efficient because IO is avoided, especially > if > > the RDD was cached. For example: > > > > // in the spark shell > > import org.apache.spark.rdd.RDD > > import org.apache.spark.rdd.RDD._ > > import scala.reflect.ClassTag > > > > // generate a map from key to rdd of values > > def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: > > ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = { > > val keys = pairRDD.keys.distinct.collect > > (for (k <- keys) yield > > k -> (pairRDD filter(_._1 == k) values) > > ) toMap > > } > > > > // simple demo > > val xs = sc.parallelize(1 to 1000) > > val ixs = xs map(x => (x % 10, x)) > > val gs = groupByKeyToRDDs(ixs) > > gs(1).collect > > > > Just an idea. > > > > Greetings, > > > > Juan Rodriguez > > > > > > > > 2015-04-29 14:20 GMT+02:00 Daniel Darabos < > > daniel.dara...@lynxanalytics.com>: > > > >> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice > >> method for saving the RDD into separate files by key in a single pass. > Then > >> you can read the files into separate RDDs. > >> > >> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá < > >> juan.rodriguez.hort...@gmail.com> wrote: > >> > >>> Hi Sébastien, > >>> > >>> I came with a similar problem some time ago, you can see the discussion > >>> in > >>> the Spark users mailing list at > >>> > >>> > http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results > >>> . My experience was that when you create too many RDDs the Spark > >>> scheduler > >>> gets stuck, so if you have many keys in the map you are creating you'll > >>> probably have problems. On the other hand, the latest example I > proposed > >>> in > >>> that mailing thread was a batch job in which we start from a single RDD > >>> of > >>> time tagged data, transform the RDD in a list of RDD corresponding to > >>> generating windows according to the time tag of the records, and then > >>> apply > >>> a transformation of RDD to each window RDD, like for example KMeans.run > >>> of > >>> MLlib. This is very similar to what you propose. > >>> So in my humble opinion the approach of generating thousands of RDDs by > >>> filtering doesn't work, and a new RDD class should be implemented for > >>> this. > >>> I have never implemented a custom RDD, but if you want some help I > would > >>> be > >>> happy to join you in this task > >>> > >> > >> Sebastien said nothing about thousands of keys. This is a valid problem > >> even if you only have two different keys. > >> > >> Greetings, > >>> > >>> Juan > >>> > >>> > >>> > >>> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère < > s.sou...@gmail.com > >>> >: > >>> > >>> > Hello, > >>> > > >>> > I'm facing a problem with custom RDD transformations. > >>> > > >>> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning > a > >>> map > >>> > of RDD by key. > >>> > > >>> > This would be great, for example, in order to process mllib > clustering > >>> on V > >>> > values grouped by K. > >>> > > >>> > I know I could do it using filter() on my RDD as many times I have > >>> keys, > >>> > but I'm afraid this would not be efficient (the entire RDD would be > >>> read > >>> > each time, right ?). Then, I could mapByPartition my RDD before > >>> filtering, > >>> > but the code is finally huge... > >>> > > >>> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: > RDD[K, > >>> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time > >>> only, > >>> > but I cannot achieve my development. > >>> > > >>> > Please, could you tell me first if this is really faisable, and then, > >>> could > >>> > you give me some pointers ? > >>> > > >>> > Thank you, > >>> > Regards, > >>> > Sebastien > >>> > > >>> > >> > >> > > >