I wonder if the issue is that these lines just need to add preservesPartitioning = true ?
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote: > ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It > could be that pyspark doesn't properly support narrow dependencies, or > maybe you need to be more explicit about the partitioner. I am looking > into the pyspark api but you might have some better guesses here than I > thought. > > My suggestion to do > > joinedRdd.getPartitions.foreach{println} > > was just to see if the partition was a NarrowCoGroupSplitDep or a > ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those > fields are hidden deeper inside and are not user-visible. But I think a > better way (in scala, anyway) is to look at rdd.dependencies. its a little > tricky, though, you need to look deep into the lineage (example at the end). > > Sean -- yes it does require both RDDs have the same partitioner, but that > should happen naturally if you just specify the same number of partitions, > you'll get equal HashPartitioners. There is a little difference in the > scala & python api that I missed here. For partitionBy in scala, you > actually need to specify the partitioner, but not in python. However I > thought it would work like groupByKey, which does just take an int. > > > Here's a code example in scala -- not sure what is available from python. > Hopefully somebody knows a simpler way to confirm narrow dependencies?? > > val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >> scala> d.partitioner == d2.partitioner >> res2: Boolean = true >> val joined = d.join(d2) >> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) >> val badJoined = d.join(d3) >> >> d.setName("d") >> d2.setName("d2") >> d3.setName("d3") >> joined.setName("joined") >> badJoined.setName("badJoined") > > >> //unfortunatley, just looking at the immediate dependencies of joined & >> badJoined is misleading, b/c join actually creates >> // one more step after the shuffle >> scala> joined.dependencies >> res20: Seq[org.apache.spark.Dependency[_]] = >> List(org.apache.spark.OneToOneDependency@74751ac8) >> //even with the join that does require a shuffle, we still see a >> OneToOneDependency, but thats just a simple flatMap step >> scala> badJoined.dependencies >> res21: Seq[org.apache.spark.Dependency[_]] = >> List(org.apache.spark.OneToOneDependency@1cf356cc) >> > > > >> //so lets make a helper function to get all the dependencies recursively >> > def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >> val deps = rdd.dependencies >> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >> } >> >> >> //full dependencies of the good join > > scala> flattenDeps(joined).foreach{println} >> (joined FlatMappedValuesRDD[9] at join at >> <console>:16,org.apache.spark.OneToOneDependency@74751ac8) >> (MappedValuesRDD[8] at join at >> <console>:16,org.apache.spark.OneToOneDependency@623264af) >> >> *(CoGroupedRDD[7] at join at >> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7] >> at join at <console>:16,org.apache.spark.OneToOneDependency@37514cd) >> (d ShuffledRDD[3] at groupByKey at >> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >> (MappedRDD[2] at map at >> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >> (d2 ShuffledRDD[6] at groupByKey at >> <console>:12,org.apache.spark.ShuffleDependency@5960236d) >> (MappedRDD[5] at map at >> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2) >> >> > >> //full dependencies of the bad join -- notice the ShuffleDependency! > > scala> flattenDeps(badJoined).foreach{println} >> (badJoined FlatMappedValuesRDD[15] at join at >> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc) >> (MappedValuesRDD[14] at join at >> <console>:16,org.apache.spark.OneToOneDependency@5dea4db) >> >> *(CoGroupedRDD[13] at join at >> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13] >> at join at <console>:16,org.apache.spark.OneToOneDependency@77ca77b5) >> (d ShuffledRDD[3] at groupByKey at >> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >> (MappedRDD[2] at map at >> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >> (d3 ShuffledRDD[12] at groupByKey at >> <console>:12,org.apache.spark.ShuffleDependency@d794984) >> (MappedRDD[11] at map at >> <console>:12,org.apache.spark.OneToOneDependency@15c98005) > > > > On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote: > >> Hi Imran, >> >> thanks for your quick reply. >> >> Actually I am doing this: >> >> rddA = rddA.partitionBy(n).cache() >> rddB = rddB.partitionBy(n).cache() >> >> followed by >> >> rddA.count() >> rddB.count() >> >> then joinedRDD = rddA.join(rddB) >> >> I thought that the count() would force the evaluation, so any subsequent >> joins would be shuffleless. I was wrong about the shuffle amounts however. >> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil >> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about >> 1GB in size, though. >> >> The getPartitions-method does not exist on the resulting RDD (I am using >> the Python API). There is however foreachPartition(). What is the line >> >> joinedRdd.getPartitions.foreach{println} >> >> supposed to do? >> >> Thank you, >> >> Karlson >> >> PS: Sorry for sending this twice, I accidentally did not reply to the >> mailing list first. >> >> >> >> On 2015-02-12 16:48, Imran Rashid wrote: >> >>> Hi Karlson, >>> >>> I think your assumptions are correct -- that join alone shouldn't require >>> any shuffling. But its possible you are getting tripped up by lazy >>> evaluation of RDDs. After you do your partitionBy, are you sure those >>> RDDs >>> are actually materialized & cached somewhere? eg., if you just did this: >>> >>> val rddA = someData.partitionBy(N) >>> val rddB = someOtherData.partitionBy(N) >>> val joinedRdd = rddA.join(rddB) >>> joinedRdd.count() //or any other action >>> >>> then the partitioning isn't actually getting run until you do the join. >>> So >>> though the join itself can happen without partitioning, joinedRdd.count() >>> will trigger the evaluation of rddA & rddB which will require shuffles. >>> Note that even if you have some intervening action on rddA & rddB that >>> shuffles them, unless you persist the result, you will need to reshuffle >>> them for the join. >>> >>> If this doesn't help explain things, for debugging >>> >>> joinedRdd.getPartitions.foreach{println} >>> >>> this is getting into the weeds, but at least this will tell us whether or >>> not you are getting narrow dependencies, which would avoid the shuffle. >>> (Does anyone know of a simpler way to check this?) >>> >>> hope this helps, >>> Imran >>> >>> >>> >>> >>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote: >>> >>> Hi All, >>>> >>>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), >>>> the >>>> other with about 8M records (~2GB)) of the format (key, value). >>>> >>>> I've done a partitionBy(num_partitions) on both RDDs and verified that >>>> both RDDs have the same number of partitions and that equal keys reside >>>> on >>>> the same partition (via mapPartitionsWithIndex). >>>> >>>> Now I'd expect that for a join on the two RDDs no shuffling is >>>> necessary. >>>> Looking at the Web UI under http://driver:4040 however reveals that >>>> that >>>> assumption is false. >>>> >>>> In fact I am seeing shuffle writes of about 200MB and reads of about >>>> 50MB. >>>> >>>> What's the explanation for that behaviour? Where am I wrong with my >>>> assumption? >>>> >>>> Thanks in advance, >>>> >>>> Karlson >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>>> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >