I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote:

> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
> could be that pyspark doesn't properly support narrow dependencies, or
> maybe you need to be more explicit about the partitioner.  I am looking
> into the pyspark api but you might have some better guesses here than I
> thought.
>
> My suggestion to do
>
> joinedRdd.getPartitions.foreach{println}
>
> was just to see if the partition was a NarrowCoGroupSplitDep or a
> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
> fields are hidden deeper inside and are not user-visible.  But I think a
> better way (in scala, anyway) is to look at rdd.dependencies.  its a little
> tricky, though, you need to look deep into the lineage (example at the end).
>
> Sean -- yes it does require both RDDs have the same partitioner, but that
> should happen naturally if you just specify the same number of partitions,
> you'll get equal HashPartitioners.  There is a little difference in the
> scala & python api that I missed here.  For partitionBy in scala, you
> actually need to specify the partitioner, but not in python.  However I
> thought it would work like groupByKey, which does just take an int.
>
>
> Here's a code example in scala -- not sure what is available from python.
> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>
> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> scala> d.partitioner == d2.partitioner
>> res2: Boolean = true
>> val joined = d.join(d2)
>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>> val badJoined = d.join(d3)
>>
>> d.setName("d")
>> d2.setName("d2")
>> d3.setName("d3")
>> joined.setName("joined")
>> badJoined.setName("badJoined")
>
>
>> //unfortunatley, just looking at the immediate dependencies of joined &
>> badJoined is misleading, b/c join actually creates
>> // one more step after the shuffle
>> scala> joined.dependencies
>> res20: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@74751ac8)
>> //even with the join that does require a shuffle, we still see a
>> OneToOneDependency, but thats just a simple flatMap step
>> scala> badJoined.dependencies
>> res21: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>
>
>
>>  //so lets make a helper function to get all the dependencies recursively
>>
> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>   val deps = rdd.dependencies
>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>> }
>>
>>
>> //full dependencies of the good join
>
> scala> flattenDeps(joined).foreach{println}
>> (joined FlatMappedValuesRDD[9] at join at
>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8)
>> (MappedValuesRDD[8] at join at
>> <console>:16,org.apache.spark.OneToOneDependency@623264af)
>>
>> *(CoGroupedRDD[7] at join at
>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
>> at join at <console>:16,org.apache.spark.OneToOneDependency@37514cd)
>> (d ShuffledRDD[3] at groupByKey at
>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d2 ShuffledRDD[6] at groupByKey at
>> <console>:12,org.apache.spark.ShuffleDependency@5960236d)
>> (MappedRDD[5] at map at
>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>
>>
>
>> //full dependencies of the bad join -- notice the ShuffleDependency!
>
> scala> flattenDeps(badJoined).foreach{println}
>> (badJoined FlatMappedValuesRDD[15] at join at
>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
>> (MappedValuesRDD[14] at join at
>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db)
>>
>> *(CoGroupedRDD[13] at join at
>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
>> at join at <console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
>> (d ShuffledRDD[3] at groupByKey at
>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d3 ShuffledRDD[12] at groupByKey at
>> <console>:12,org.apache.spark.ShuffleDependency@d794984)
>> (MappedRDD[11] at map at
>> <console>:12,org.apache.spark.OneToOneDependency@15c98005)
>
>
>
> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote:
>
>> Hi Imran,
>>
>> thanks for your quick reply.
>>
>> Actually I am doing this:
>>
>>     rddA = rddA.partitionBy(n).cache()
>>     rddB = rddB.partitionBy(n).cache()
>>
>> followed by
>>
>>     rddA.count()
>>     rddB.count()
>>
>> then joinedRDD = rddA.join(rddB)
>>
>> I thought that the count() would force the evaluation, so any subsequent
>> joins would be shuffleless. I was wrong about the shuffle amounts however.
>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil
>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about
>> 1GB in size, though.
>>
>> The getPartitions-method does not exist on the resulting RDD (I am using
>> the Python API). There is however foreachPartition(). What is the line
>>
>>     joinedRdd.getPartitions.foreach{println}
>>
>> supposed to do?
>>
>> Thank you,
>>
>> Karlson
>>
>> PS: Sorry for sending this twice, I accidentally did not reply to the
>> mailing list first.
>>
>>
>>
>> On 2015-02-12 16:48, Imran Rashid wrote:
>>
>>> Hi Karlson,
>>>
>>> I think your assumptions are correct -- that join alone shouldn't require
>>> any shuffling.  But its possible you are getting tripped up by lazy
>>> evaluation of RDDs.  After you do your partitionBy, are you sure those
>>> RDDs
>>> are actually materialized & cached somewhere?  eg., if you just did this:
>>>
>>> val rddA = someData.partitionBy(N)
>>> val rddB = someOtherData.partitionBy(N)
>>> val joinedRdd = rddA.join(rddB)
>>> joinedRdd.count() //or any other action
>>>
>>> then the partitioning isn't actually getting run until you do the join.
>>> So
>>> though the join itself can happen without partitioning, joinedRdd.count()
>>> will trigger the evaluation of rddA & rddB which will require shuffles.
>>> Note that even if you have some intervening action on rddA & rddB that
>>> shuffles them, unless you persist the result, you will need to reshuffle
>>> them for the join.
>>>
>>> If this doesn't help explain things, for debugging
>>>
>>> joinedRdd.getPartitions.foreach{println}
>>>
>>> this is getting into the weeds, but at least this will tell us whether or
>>> not you are getting narrow dependencies, which would avoid the shuffle.
>>>  (Does anyone know of a simpler way to check this?)
>>>
>>> hope this helps,
>>> Imran
>>>
>>>
>>>
>>>
>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:
>>>
>>>  Hi All,
>>>>
>>>> using Pyspark, I create two RDDs (one with about 2M records (~200MB),
>>>> the
>>>> other with about 8M records (~2GB)) of the format (key, value).
>>>>
>>>> I've done a partitionBy(num_partitions) on both RDDs and verified that
>>>> both RDDs have the same number of partitions and that equal keys reside
>>>> on
>>>> the same partition (via mapPartitionsWithIndex).
>>>>
>>>> Now I'd expect that for a join on the two RDDs no shuffling is
>>>> necessary.
>>>> Looking at the Web UI under http://driver:4040 however reveals that
>>>> that
>>>> assumption is false.
>>>>
>>>> In fact I am seeing shuffle writes of about 200MB and reads of about
>>>> 50MB.
>>>>
>>>> What's the explanation for that behaviour? Where am I wrong with my
>>>> assumption?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Karlson
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to