Good catch. `Join` should use `Iterator`, too. I open an JIRA here: https://issues.apache.org/jira/browse/SPARK-4824
Best Regards, Shixiong Zhu 2014-12-10 21:35 GMT+08:00 Johannes Simon <[email protected]>: > Hi! > > Using an iterator solved the problem! I've been chewing on this for days, > so thanks a lot to both of you!! :) > > Since in an earlier version of my code, I used a self-join to perform the > same thing, and ran into the same problems, I just looked at the > implementation of PairRDDFunction.join (Spark v1.1.1): > > def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, > (V, W))] = { > this.cogroup(other, partitioner).flatMapValues( pair => > for (v <- pair._1; w <- pair._2) yield (v, w) > ) > } > > Is there a reason to not use an iterator here if possible? Pardon my lack > of Scala knowledge.. This should in any case cause the same problems I had > when the size of vs/ws gets too large. (Though that question is more of a > dev ml question) > > Thanks! > Johannes > > Am 10.12.2014 um 13:44 schrieb Shixiong Zhu <[email protected]>: > > for(v1 <- values; v2 <- values) yield ((v1, v2), 1) will generate all > data at once and return all of them to flatMap. > > To solve your problem, you should use for (v1 <- values.iterator; v2 <- > values.iterator) yield ((v1, v2), 1) which will generate the data when > it’s necessary. > > > Best Regards, > Shixiong Zhu > > 2014-12-10 20:13 GMT+08:00 Johannes Simon <[email protected]>: > >> Hi! >> >> I have been using spark a lot recently and it's been running really well >> and fast, but now when I increase the data size, it's starting to run into >> problems: >> I have an RDD in the form of (String, Iterable[String]) - the >> Iterable[String] was produced by a groupByKey() - and I perform a flatMap >> on it that outputs some form of cartesian product of the values per key: >> >> >> rdd.flatMap({case (key, values) => for(v1 <- values; v2 <- values) yield >> ((v1, v2), 1)}) >> >> >> So the runtime cost per RDD entry is O(n^2) where n is the number of >> values. This n can sometimes be 10,000 or even 100,000. That produces a lot >> of data, I am aware of that, but otherwise I wouldn't need a cluster, would >> I? :) For n<=1000 this operation works quite well. But as soon as I allow n >> to be <= 10,000 or higher, I start to get "GC overhead limit exceeded" >> exceptions. >> >> Configuration: >> - 7g executor memory >> - spark.shuffle.memoryFraction=0.5 >> - spark.storage.memoryFraction=0.1 >> I am not sure how the remaining memory for the actual JVM instance >> performing the flatMap is computed, but I would assume it to be something >> like (1-0.5-0.1)*7g = 2.8g >> >> Now I am wondering: Why should these 2.8g (or say even a few hundred MB) >> not suffice for spark to process this flatMap without too much GC overhead? >> If I assume a string to be 10 characters on average, therefore consuming >> about 60 bytes with overhead taken into account, then 10,000 of these >> values sum up to no more than ~600kb, and apart from that spark never has >> to keep anything else in memory. >> >> My question: When does spark start to spill RDD entries to disk, assuming >> that no RDD is to be persisted? Does it keep all output of the flatMap >> operation in memory until the entire flatMap is done? Or does it already >> spill every single yielded "((v1, v2), 1)" entry out to disk if necessary? >> >> Thanks a lot! >> Johannes >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> > >
