Hi Nick, How does reduce work? I thought after reducing in the executor, it will reduce in parallel between multiple executors instead of pulling everything to driver and reducing there.
Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Jun 9, 2014 at 11:07 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Can you key your RDD by some key and use reduceByKey? In fact if you are > merging bunch of maps you can create a set of (k, v) in your mapPartitions > and then reduceByKey using some merge function. The reduce will happen in > parallel on multiple nodes in this case. You'll end up with just a single > set of k, v per partition which you can reduce or collect and merge on the > driver. > > > — > Sent from Mailbox > > > On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung <coded...@cs.stanford.edu> > wrote: >> >> I suppose what I want is the memory efficiency of toLocalIterator and the >> speed of collect. Is there any such thing? >> >> >> On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung <coded...@cs.stanford.edu> >> wrote: >>> >>> Hello, >>> >>> I noticed that the final reduce function happens in the driver node with >>> a code that looks like the following. >>> >>> val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { >>> a.merge(b) >>> } >>> >>> although individual outputs from mappers are small. Over time the >>> aggregated result outputMap could be huuuge (say with hundreds of millions >>> of keys and values, reaching giga bytes). >>> >>> I noticed that, even if we have a lot of memory in the driver node, this >>> process becomes reallllly slow eventually (say we have 100+ partitions. the >>> first reduce is fast, but progressively, it becomes veeery slow as more and >>> more partition outputs get aggregated). Is this because the intermediate >>> reduce output gets serialized and then deserialized every time? >>> >>> What I'd like ideally is, since reduce is taking place in the same >>> machine any way, there's no need for any serialization and deserialization, >>> and just aggregate the incoming results into the final aggregation. Is this >>> possible? >> >> >