Thanks!

I was getting a little confused by this partitioner business, I thought
that by default a pairRDD would be partitioned by a HashPartitioner? Was
this possibly the case in 0.9.3 but not in 1.x?

In anycase, I tried your suggestion and the shuffle was removed. Cheers.

One small question though, is it important that the same hash partitioner
be used. e.g. could I have written instead:

val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(new
org.apache.spark.HashPartitioner(10))
(0 until 5).foreach { idx =>
  val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) ->
x}.partitionBy(new org.apache.spark.HashPartitioner(10))
  println(idx + " ---> " + otherData.join(d3).count())
}






On Fri, May 22, 2015 at 1:10 PM, Imran Rashid <[email protected]> wrote:

> This is a pretty interesting case.  I think the fact that stage 519 says
> its doing the map at reconstruct.scala:153 is somewhat misleading -- its
> just the best ID it has for what its working on.  Really, its just writing
> the map output data for the shuffle.  You can see that the stage generated
> 8.4 GB of shuffle output.  But its not actually recomputing your RDD, its
> just spending that time taking the data from memory, and writing the
> appropriate shuffle outputs to disk.
>
> Here's an example which shows that the RDD isn't really getting recomputed
> each time.  I generate some data, with a fake "slow" function (sleep for 5
> seconds) and then cache it.  Then that data set is joined against a few
> other datasets each time.
>
> val d1 = sc.parallelize(1 to 100).mapPartitions { itr =>
>   Thread.sleep(5000)
>   itr.map { x => (x % 10) -> x }
> }.cache()
> d1.count()
>
> (0 until 5).foreach { idx =>
>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> x}
>   println(idx + " ---> " + otherData.join(d1).count())
> }
>
> If you run and look at the UI, the stages that are run is very similar to
> what you see in your example.  It seems that "d1" gets run many times, but
> actually its just generating the shuffle map output many times.  You will
> only have that long 5 second sleep happen once.
>
> But, you can actually do even better in this case.  You can use the idea
> of narrow dependencies to make spark write the shuffle output for the
> shared dataset only one time.  Consider this example instead:
>
> val partitioner = new org.apache.spark.HashPartitioner(10)
> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
> x}.partitionBy(partitioner)
> (0 until 5).foreach { idx =>
>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) ->
> x}.partitionBy(partitioner)
>   println(idx + " ---> " + otherData.join(d3).count())
> }
>
> This time, "d3" isnt' even cached.  But because d3 and otherData use the
> same partitioner, spark knows it doesn't need to resort d3 each time.  It
> can use the existing shuffle output it already has sitting on disk.  So
> you'll see the stage is "skipped" in the UI (except for the first job):
>
>
>
>
> On Fri, May 22, 2015 at 11:59 AM, Shay Seng <[email protected]> wrote:
>
>> Hi.
>>
>> I have an RDD that I use repeatedly through many iterations of an
>> algorithm. To prevent recomputation, I persist the RDD (and incidentally I
>> also persist and checkpoint it's parents)
>>
>>
>> val consCostConstraintMap = consCost.join(constraintMap).map {
>>   case (cid, (costs,(mid1,_,mid2,_,_))) => {
>>     (cid, (costs, mid1, mid2))
>>   }
>> }
>> consCostConstraintMap.setName("consCostConstraintMap")
>> consCostConstraintMap.persist(MEMORY_AND_DISK_SER)
>>
>> ...
>>
>> later on in an iterative loop
>>
>> val update = updatedTrips.join(consCostConstraintMap).flatMap {
>>       ...
>> }.treeReduce()
>>
>> ---------
>>
>> I can see from the UI that consCostConstraintMap is in storage
>> RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize
>> in TachyonSize on Disk
>>
>>
>>
>>
>>
>>
>> consCostConstraintMap
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:4040/storage/rdd?id=113>Memory
>> Serialized 1x Replicated600100%15.2 GB0.0 B0.0 B
>> ---------
>> In the Jobs list, I see the following pattern
>>
>> Where each of the treeReduce line corresponds to one iteration loop
>>
>> Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
>> stages): Succeeded/Total
>>
>>
>>
>>
>>
>> 13treeReduce at reconstruct.scala:243
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=13>2015/05/22
>> 16:27:112.9 min16/16 (194 skipped)
>> 9024/9024 (109225 skipped)
>> 12treeReduce at reconstruct.scala:243
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=12>2015/05/22
>> 16:24:162.9 min16/16 (148 skipped)
>> 9024/9024 (82725 skipped)
>> 11treeReduce at reconstruct.scala:243
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=11>2015/05/22
>> 16:21:212.9 min16/16 (103 skipped)
>> 9024/9024 (56280 skipped)
>> 10treeReduce at reconstruct.scala:243
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=10>2015/05/22
>> 16:18:282.9 min16/16 (69 skipped)
>> 9024/9024 (36980 skipped)
>>
>>
>>
>>
>>
>>
>> --------------
>> If I push into one Job I see
>> *Completed Stages:*
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#completed>
>>  16
>>
>>    - *Skipped Stages:*
>>    
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#skipped>
>>     148
>>
>> Completed Stages (16)
>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
>> ReadShuffle Write525treeReduce at reconstruct.scala:243
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=525&attempt=0>
>> +details
>>
>> 2015/05/22 16:27:0942 ms
>> 24/24
>> 21.7 KB524.......
>>
>>
>>
>>
>>
>>
>> 519map at reconstruct.scala:153
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519&attempt=0>
>> +details
>>
>> 2015/05/22 16:24:161.2 min
>> 600/600
>> 14.8 GB8.4 GBThe last line map at reconstruct.scala:153
>> <http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519&attempt=0>
>>  corresponds
>> to "val consCostConstraintMap = consCost.join(constraintMap).map {"
>> Which I expected to have been cached.
>> Is there some way I can find out what it is spending 1.2 mins doing .. I
>> presume reading and writing GB of data. But why? Eveything should be in
>> memory?
>>
>> Any clues on where I should start?
>>
>> tks
>>
>>
>

Reply via email to