yes, HashPartitioner is the default, but (a) just creating an RDD of pairs doesn't apply any partitioner and (b) both rdds have to have the same number of partitions (its not enough for them to just have a HashPartitioner). You only get a narrow dependency if you have the same number of partitions on both sides of the join / cogroup.
You don't have to same the object by identity, both partitioners just need to be considered the "same" by equals(). And HashPartitioner considers any hash partitioner with the same number of partitions to be equal. and you can try it out yourself in the repl too, you know :) On Sat, May 23, 2015 at 11:55 AM, Shay Seng <s...@urbanengines.com> wrote: > Thanks! > > I was getting a little confused by this partitioner business, I thought > that by default a pairRDD would be partitioned by a HashPartitioner? Was > this possibly the case in 0.9.3 but not in 1.x? > > In anycase, I tried your suggestion and the shuffle was removed. Cheers. > > One small question though, is it important that the same hash partitioner > be used. e.g. could I have written instead: > > val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(new > org.apache.spark.HashPartitioner(10)) > (0 until 5).foreach { idx => > val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> > x}.partitionBy(new org.apache.spark.HashPartitioner(10)) > println(idx + " ---> " + otherData.join(d3).count()) > } > > > > > > > On Fri, May 22, 2015 at 1:10 PM, Imran Rashid <iras...@cloudera.com> > wrote: > >> This is a pretty interesting case. I think the fact that stage 519 says >> its doing the map at reconstruct.scala:153 is somewhat misleading -- its >> just the best ID it has for what its working on. Really, its just writing >> the map output data for the shuffle. You can see that the stage generated >> 8.4 GB of shuffle output. But its not actually recomputing your RDD, its >> just spending that time taking the data from memory, and writing the >> appropriate shuffle outputs to disk. >> >> Here's an example which shows that the RDD isn't really getting >> recomputed each time. I generate some data, with a fake "slow" function >> (sleep for 5 seconds) and then cache it. Then that data set is joined >> against a few other datasets each time. >> >> val d1 = sc.parallelize(1 to 100).mapPartitions { itr => >> Thread.sleep(5000) >> itr.map { x => (x % 10) -> x } >> }.cache() >> d1.count() >> >> (0 until 5).foreach { idx => >> val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> >> x} >> println(idx + " ---> " + otherData.join(d1).count()) >> } >> >> If you run and look at the UI, the stages that are run is very similar to >> what you see in your example. It seems that "d1" gets run many times, but >> actually its just generating the shuffle map output many times. You will >> only have that long 5 second sleep happen once. >> >> But, you can actually do even better in this case. You can use the idea >> of narrow dependencies to make spark write the shuffle output for the >> shared dataset only one time. Consider this example instead: >> >> val partitioner = new org.apache.spark.HashPartitioner(10) >> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> >> x}.partitionBy(partitioner) >> (0 until 5).foreach { idx => >> val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> >> x}.partitionBy(partitioner) >> println(idx + " ---> " + otherData.join(d3).count()) >> } >> >> This time, "d3" isnt' even cached. But because d3 and otherData use the >> same partitioner, spark knows it doesn't need to resort d3 each time. It >> can use the existing shuffle output it already has sitting on disk. So >> you'll see the stage is "skipped" in the UI (except for the first job): >> >> >> >> >> On Fri, May 22, 2015 at 11:59 AM, Shay Seng <s...@urbanengines.com> >> wrote: >> >>> Hi. >>> >>> I have an RDD that I use repeatedly through many iterations of an >>> algorithm. To prevent recomputation, I persist the RDD (and incidentally I >>> also persist and checkpoint it's parents) >>> >>> >>> val consCostConstraintMap = consCost.join(constraintMap).map { >>> case (cid, (costs,(mid1,_,mid2,_,_))) => { >>> (cid, (costs, mid1, mid2)) >>> } >>> } >>> consCostConstraintMap.setName("consCostConstraintMap") >>> consCostConstraintMap.persist(MEMORY_AND_DISK_SER) >>> >>> ... >>> >>> later on in an iterative loop >>> >>> val update = updatedTrips.join(consCostConstraintMap).flatMap { >>> ... >>> }.treeReduce() >>> >>> --------- >>> >>> I can see from the UI that consCostConstraintMap is in storage >>> RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize >>> in TachyonSize on Disk >>> >>> >>> >>> >>> >>> >>> consCostConstraintMap >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:4040/storage/rdd?id=113>Memory >>> Serialized 1x Replicated600100%15.2 GB0.0 B0.0 B >>> --------- >>> In the Jobs list, I see the following pattern >>> >>> Where each of the treeReduce line corresponds to one iteration loop >>> >>> Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all >>> stages): Succeeded/Total >>> >>> >>> >>> >>> >>> 13treeReduce at reconstruct.scala:243 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=13>2015/05/22 >>> 16:27:112.9 min16/16 (194 skipped) >>> 9024/9024 (109225 skipped) >>> 12treeReduce at reconstruct.scala:243 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=12>2015/05/22 >>> 16:24:162.9 min16/16 (148 skipped) >>> 9024/9024 (82725 skipped) >>> 11treeReduce at reconstruct.scala:243 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=11>2015/05/22 >>> 16:21:212.9 min16/16 (103 skipped) >>> 9024/9024 (56280 skipped) >>> 10treeReduce at reconstruct.scala:243 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=10>2015/05/22 >>> 16:18:282.9 min16/16 (69 skipped) >>> 9024/9024 (36980 skipped) >>> >>> >>> >>> >>> >>> >>> -------------- >>> If I push into one Job I see >>> *Completed Stages:* >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#completed> >>> 16 >>> >>> - *Skipped Stages:* >>> >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#skipped> >>> 148 >>> >>> Completed Stages (16) >>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle >>> ReadShuffle Write525treeReduce at reconstruct.scala:243 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=525&attempt=0> >>> +details >>> >>> 2015/05/22 16:27:0942 ms >>> 24/24 >>> 21.7 KB524....... >>> >>> >>> >>> >>> >>> >>> 519map at reconstruct.scala:153 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519&attempt=0> >>> +details >>> >>> 2015/05/22 16:24:161.2 min >>> 600/600 >>> 14.8 GB8.4 GBThe last line map at reconstruct.scala:153 >>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519&attempt=0> >>> corresponds >>> to "val consCostConstraintMap = consCost.join(constraintMap).map {" >>> Which I expected to have been cached. >>> Is there some way I can find out what it is spending 1.2 mins doing .. I >>> presume reading and writing GB of data. But why? Eveything should be in >>> memory? >>> >>> Any clues on where I should start? >>> >>> tks >>> >>> >> >