This will be fixed by https://github.com/apache/spark/pull/4629
On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote: > yeah I thought the same thing at first too, I suggested something equivalent > w/ preservesPartitioning = true, but that isn't enough. the join is done by > union-ing the two transformed rdds, which is very different from the way it > works under the hood in scala to enable narrow dependencies. It really > needs a bigger change to pyspark. I filed this issue: > https://issues.apache.org/jira/browse/SPARK-5785 > > (and the somewhat related issue about documentation: > https://issues.apache.org/jira/browse/SPARK-5786) > > partitioning should still work in pyspark, you still need some notion of > distributing work, and the pyspark functions have a partitionFunc to decide > that. But, I am not an authority on pyspark, so perhaps there are more > holes I'm not aware of ... > > Imran > > On Fri, Feb 13, 2015 at 8:36 AM, Karlson <ksonsp...@siberie.de> wrote: >> >> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, >> wouldn't it help to change the lines >> >> vs = rdd.map(lambda (k, v): (k, (1, v))) >> ws = other.map(lambda (k, v): (k, (2, v))) >> >> to >> >> vs = rdd.mapValues(lambda v: (1, v)) >> ws = other.mapValues(lambda v: (2, v)) >> >> ? >> As I understand, this would preserve the original partitioning. >> >> >> >> On 2015-02-13 12:43, Karlson wrote: >>> >>> Does that mean partitioning does not work in Python? Or does this only >>> effect joining? >>> >>> On 2015-02-12 19:27, Davies Liu wrote: >>>> >>>> The feature works as expected in Scala/Java, but not implemented in >>>> Python. >>>> >>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com> >>>> wrote: >>>>> >>>>> I wonder if the issue is that these lines just need to add >>>>> preservesPartitioning = true >>>>> ? >>>>> >>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 >>>>> >>>>> I am getting the feeling this is an issue w/ pyspark >>>>> >>>>> >>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> >>>>> wrote: >>>>>> >>>>>> >>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. >>>>>> It >>>>>> could be that pyspark doesn't properly support narrow dependencies, or >>>>>> maybe >>>>>> you need to be more explicit about the partitioner. I am looking into >>>>>> the >>>>>> pyspark api but you might have some better guesses here than I >>>>>> thought. >>>>>> >>>>>> My suggestion to do >>>>>> >>>>>> joinedRdd.getPartitions.foreach{println} >>>>>> >>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a >>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those >>>>>> fields >>>>>> are hidden deeper inside and are not user-visible. But I think a >>>>>> better way >>>>>> (in scala, anyway) is to look at rdd.dependencies. its a little >>>>>> tricky, >>>>>> though, you need to look deep into the lineage (example at the end). >>>>>> >>>>>> Sean -- yes it does require both RDDs have the same partitioner, but >>>>>> that >>>>>> should happen naturally if you just specify the same number of >>>>>> partitions, >>>>>> you'll get equal HashPartitioners. There is a little difference in >>>>>> the >>>>>> scala & python api that I missed here. For partitionBy in scala, you >>>>>> actually need to specify the partitioner, but not in python. However >>>>>> I >>>>>> thought it would work like groupByKey, which does just take an int. >>>>>> >>>>>> >>>>>> Here's a code example in scala -- not sure what is available from >>>>>> python. >>>>>> Hopefully somebody knows a simpler way to confirm narrow >>>>>> dependencies?? >>>>>> >>>>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> >>>>>>> x}.groupByKey(64) >>>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>>> x}.groupByKey(64) >>>>>>> scala> d.partitioner == d2.partitioner >>>>>>> res2: Boolean = true >>>>>>> val joined = d.join(d2) >>>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>>> x}.groupByKey(100) >>>>>>> val badJoined = d.join(d3) >>>>>>> >>>>>>> d.setName("d") >>>>>>> d2.setName("d2") >>>>>>> d3.setName("d3") >>>>>>> joined.setName("joined") >>>>>>> badJoined.setName("badJoined") >>>>>>> >>>>>>> >>>>>>> //unfortunatley, just looking at the immediate dependencies of joined >>>>>>> & >>>>>>> badJoined is misleading, b/c join actually creates >>>>>>> // one more step after the shuffle >>>>>>> scala> joined.dependencies >>>>>>> res20: Seq[org.apache.spark.Dependency[_]] = >>>>>>> List(org.apache.spark.OneToOneDependency@74751ac8) >>>>>>> //even with the join that does require a shuffle, we still see a >>>>>>> OneToOneDependency, but thats just a simple flatMap step >>>>>>> scala> badJoined.dependencies >>>>>>> res21: Seq[org.apache.spark.Dependency[_]] = >>>>>>> List(org.apache.spark.OneToOneDependency@1cf356cc) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> //so lets make a helper function to get all the dependencies >>>>>>> recursively >>>>>>> >>>>>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >>>>>>> val deps = rdd.dependencies >>>>>>> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >>>>>>> } >>>>>>> >>>>>>> >>>>>>> //full dependencies of the good join >>>>>>> >>>>>>> scala> flattenDeps(joined).foreach{println} >>>>>>> (joined FlatMappedValuesRDD[9] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8) >>>>>>> (MappedValuesRDD[8] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@623264af) >>>>>>> (CoGroupedRDD[7] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86) >>>>>>> (CoGroupedRDD[7] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@37514cd) >>>>>>> (d ShuffledRDD[3] at groupByKey at >>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>>>>>> (MappedRDD[2] at map at >>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>>>>>> (d2 ShuffledRDD[6] at groupByKey at >>>>>>> <console>:12,org.apache.spark.ShuffleDependency@5960236d) >>>>>>> (MappedRDD[5] at map at >>>>>>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2) >>>>>>> >>>>>>> >>>>>>> >>>>>>> //full dependencies of the bad join -- notice the ShuffleDependency! >>>>>>> >>>>>>> scala> flattenDeps(badJoined).foreach{println} >>>>>>> (badJoined FlatMappedValuesRDD[15] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc) >>>>>>> (MappedValuesRDD[14] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db) >>>>>>> (CoGroupedRDD[13] at join at >>>>>>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df) >>>>>>> (CoGroupedRDD[13] at join at >>>>>>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5) >>>>>>> (d ShuffledRDD[3] at groupByKey at >>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080) >>>>>>> (MappedRDD[2] at map at >>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec) >>>>>>> (d3 ShuffledRDD[12] at groupByKey at >>>>>>> <console>:12,org.apache.spark.ShuffleDependency@d794984) >>>>>>> (MappedRDD[11] at map at >>>>>>> <console>:12,org.apache.spark.OneToOneDependency@15c98005) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> >>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> Hi Imran, >>>>>>> >>>>>>> thanks for your quick reply. >>>>>>> >>>>>>> Actually I am doing this: >>>>>>> >>>>>>> rddA = rddA.partitionBy(n).cache() >>>>>>> rddB = rddB.partitionBy(n).cache() >>>>>>> >>>>>>> followed by >>>>>>> >>>>>>> rddA.count() >>>>>>> rddB.count() >>>>>>> >>>>>>> then joinedRDD = rddA.join(rddB) >>>>>>> >>>>>>> I thought that the count() would force the evaluation, so any >>>>>>> subsequent >>>>>>> joins would be shuffleless. I was wrong about the shuffle amounts >>>>>>> however. >>>>>>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) >>>>>>> whil >>>>>>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of >>>>>>> about >>>>>>> 1GB in size, though. >>>>>>> >>>>>>> The getPartitions-method does not exist on the resulting RDD (I am >>>>>>> using >>>>>>> the Python API). There is however foreachPartition(). What is the >>>>>>> line >>>>>>> >>>>>>> joinedRdd.getPartitions.foreach{println} >>>>>>> >>>>>>> supposed to do? >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Karlson >>>>>>> >>>>>>> PS: Sorry for sending this twice, I accidentally did not reply to the >>>>>>> mailing list first. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 2015-02-12 16:48, Imran Rashid wrote: >>>>>>>> >>>>>>>> >>>>>>>> Hi Karlson, >>>>>>>> >>>>>>>> I think your assumptions are correct -- that join alone shouldn't >>>>>>>> require >>>>>>>> any shuffling. But its possible you are getting tripped up by lazy >>>>>>>> evaluation of RDDs. After you do your partitionBy, are you sure >>>>>>>> those >>>>>>>> RDDs >>>>>>>> are actually materialized & cached somewhere? eg., if you just did >>>>>>>> this: >>>>>>>> >>>>>>>> val rddA = someData.partitionBy(N) >>>>>>>> val rddB = someOtherData.partitionBy(N) >>>>>>>> val joinedRdd = rddA.join(rddB) >>>>>>>> joinedRdd.count() //or any other action >>>>>>>> >>>>>>>> then the partitioning isn't actually getting run until you do the >>>>>>>> join. >>>>>>>> So >>>>>>>> though the join itself can happen without partitioning, >>>>>>>> joinedRdd.count() >>>>>>>> will trigger the evaluation of rddA & rddB which will require >>>>>>>> shuffles. >>>>>>>> Note that even if you have some intervening action on rddA & rddB >>>>>>>> that >>>>>>>> shuffles them, unless you persist the result, you will need to >>>>>>>> reshuffle >>>>>>>> them for the join. >>>>>>>> >>>>>>>> If this doesn't help explain things, for debugging >>>>>>>> >>>>>>>> joinedRdd.getPartitions.foreach{println} >>>>>>>> >>>>>>>> this is getting into the weeds, but at least this will tell us >>>>>>>> whether >>>>>>>> or >>>>>>>> not you are getting narrow dependencies, which would avoid the >>>>>>>> shuffle. >>>>>>>> (Does anyone know of a simpler way to check this?) >>>>>>>> >>>>>>>> hope this helps, >>>>>>>> Imran >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> using Pyspark, I create two RDDs (one with about 2M records >>>>>>>>> (~200MB), >>>>>>>>> the >>>>>>>>> other with about 8M records (~2GB)) of the format (key, value). >>>>>>>>> >>>>>>>>> I've done a partitionBy(num_partitions) on both RDDs and verified >>>>>>>>> that >>>>>>>>> both RDDs have the same number of partitions and that equal keys >>>>>>>>> reside >>>>>>>>> on >>>>>>>>> the same partition (via mapPartitionsWithIndex). >>>>>>>>> >>>>>>>>> Now I'd expect that for a join on the two RDDs no shuffling is >>>>>>>>> necessary. >>>>>>>>> Looking at the Web UI under http://driver:4040 however reveals that >>>>>>>>> that >>>>>>>>> assumption is false. >>>>>>>>> >>>>>>>>> In fact I am seeing shuffle writes of about 200MB and reads of >>>>>>>>> about >>>>>>>>> 50MB. >>>>>>>>> >>>>>>>>> What's the explanation for that behaviour? Where am I wrong with my >>>>>>>>> assumption? >>>>>>>>> >>>>>>>>> Thanks in advance, >>>>>>>>> >>>>>>>>> Karlson >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>> >>>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org