This will be fixed by https://github.com/apache/spark/pull/4629

On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com> wrote:
> yeah I thought the same thing at first too, I suggested something equivalent
> w/ preservesPartitioning = true, but that isn't enough.  the join is done by
> union-ing the two transformed rdds, which is very different from the way it
> works under the hood in scala to enable narrow dependencies.  It really
> needs a bigger change to pyspark.  I filed this issue:
> https://issues.apache.org/jira/browse/SPARK-5785
>
> (and the somewhat related issue about documentation:
> https://issues.apache.org/jira/browse/SPARK-5786)
>
> partitioning should still work in pyspark, you still need some notion of
> distributing work, and the pyspark functions have a partitionFunc to decide
> that.  But, I am not an authority on pyspark, so perhaps there are more
> holes I'm not aware of ...
>
> Imran
>
> On Fri, Feb 13, 2015 at 8:36 AM, Karlson <ksonsp...@siberie.de> wrote:
>>
>> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
>> wouldn't it help to change the lines
>>
>>     vs = rdd.map(lambda (k, v): (k, (1, v)))
>>     ws = other.map(lambda (k, v): (k, (2, v)))
>>
>> to
>>
>>     vs = rdd.mapValues(lambda v: (1, v))
>>     ws = other.mapValues(lambda v: (2, v))
>>
>> ?
>> As I understand, this would preserve the original partitioning.
>>
>>
>>
>> On 2015-02-13 12:43, Karlson wrote:
>>>
>>> Does that mean partitioning does not work in Python? Or does this only
>>> effect joining?
>>>
>>> On 2015-02-12 19:27, Davies Liu wrote:
>>>>
>>>> The feature works as expected in Scala/Java, but not implemented in
>>>> Python.
>>>>
>>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid <iras...@cloudera.com>
>>>> wrote:
>>>>>
>>>>> I wonder if the issue is that these lines just need to add
>>>>> preservesPartitioning = true
>>>>> ?
>>>>>
>>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>>>>>
>>>>> I am getting the feeling this is an issue w/ pyspark
>>>>>
>>>>>
>>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid <iras...@cloudera.com>
>>>>> wrote:
>>>>>>
>>>>>>
>>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
>>>>>> It
>>>>>> could be that pyspark doesn't properly support narrow dependencies, or
>>>>>> maybe
>>>>>> you need to be more explicit about the partitioner.  I am looking into
>>>>>> the
>>>>>> pyspark api but you might have some better guesses here than I
>>>>>> thought.
>>>>>>
>>>>>> My suggestion to do
>>>>>>
>>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>>
>>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
>>>>>> fields
>>>>>> are hidden deeper inside and are not user-visible.  But I think a
>>>>>> better way
>>>>>> (in scala, anyway) is to look at rdd.dependencies.  its a little
>>>>>> tricky,
>>>>>> though, you need to look deep into the lineage (example at the end).
>>>>>>
>>>>>> Sean -- yes it does require both RDDs have the same partitioner, but
>>>>>> that
>>>>>> should happen naturally if you just specify the same number of
>>>>>> partitions,
>>>>>> you'll get equal HashPartitioners.  There is a little difference in
>>>>>> the
>>>>>> scala & python api that I missed here.  For partitionBy in scala, you
>>>>>> actually need to specify the partitioner, but not in python.  However
>>>>>> I
>>>>>> thought it would work like groupByKey, which does just take an int.
>>>>>>
>>>>>>
>>>>>> Here's a code example in scala -- not sure what is available from
>>>>>> python.
>>>>>> Hopefully somebody knows a simpler way to confirm narrow
>>>>>> dependencies??
>>>>>>
>>>>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x ->
>>>>>>> x}.groupByKey(64)
>>>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>>> x}.groupByKey(64)
>>>>>>> scala> d.partitioner == d2.partitioner
>>>>>>> res2: Boolean = true
>>>>>>> val joined = d.join(d2)
>>>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x ->
>>>>>>> x}.groupByKey(100)
>>>>>>> val badJoined = d.join(d3)
>>>>>>>
>>>>>>> d.setName("d")
>>>>>>> d2.setName("d2")
>>>>>>> d3.setName("d3")
>>>>>>> joined.setName("joined")
>>>>>>> badJoined.setName("badJoined")
>>>>>>>
>>>>>>>
>>>>>>> //unfortunatley, just looking at the immediate dependencies of joined
>>>>>>> &
>>>>>>> badJoined is misleading, b/c join actually creates
>>>>>>> // one more step after the shuffle
>>>>>>> scala> joined.dependencies
>>>>>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>>>>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>>>>>> //even with the join that does require a shuffle, we still see a
>>>>>>> OneToOneDependency, but thats just a simple flatMap step
>>>>>>> scala> badJoined.dependencies
>>>>>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>>>>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>  //so lets make a helper function to get all the dependencies
>>>>>>> recursively
>>>>>>>
>>>>>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>>>>>>   val deps = rdd.dependencies
>>>>>>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> //full dependencies of the good join
>>>>>>>
>>>>>>> scala> flattenDeps(joined).foreach{println}
>>>>>>> (joined FlatMappedValuesRDD[9] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@74751ac8)
>>>>>>> (MappedValuesRDD[8] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@623264af)
>>>>>>> (CoGroupedRDD[7] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)
>>>>>>> (CoGroupedRDD[7] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@37514cd)
>>>>>>> (d ShuffledRDD[3] at groupByKey at
>>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>>>>>> (MappedRDD[2] at map at
>>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>>>>>> (d2 ShuffledRDD[6] at groupByKey at
>>>>>>> <console>:12,org.apache.spark.ShuffleDependency@5960236d)
>>>>>>> (MappedRDD[5] at map at
>>>>>>> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> //full dependencies of the bad join -- notice the ShuffleDependency!
>>>>>>>
>>>>>>> scala> flattenDeps(badJoined).foreach{println}
>>>>>>> (badJoined FlatMappedValuesRDD[15] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
>>>>>>> (MappedValuesRDD[14] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@5dea4db)
>>>>>>> (CoGroupedRDD[13] at join at
>>>>>>> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)
>>>>>>> (CoGroupedRDD[13] at join at
>>>>>>> <console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
>>>>>>> (d ShuffledRDD[3] at groupByKey at
>>>>>>> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
>>>>>>> (MappedRDD[2] at map at
>>>>>>> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
>>>>>>> (d3 ShuffledRDD[12] at groupByKey at
>>>>>>> <console>:12,org.apache.spark.ShuffleDependency@d794984)
>>>>>>> (MappedRDD[11] at map at
>>>>>>> <console>:12,org.apache.spark.OneToOneDependency@15c98005)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>> Hi Imran,
>>>>>>>
>>>>>>> thanks for your quick reply.
>>>>>>>
>>>>>>> Actually I am doing this:
>>>>>>>
>>>>>>>     rddA = rddA.partitionBy(n).cache()
>>>>>>>     rddB = rddB.partitionBy(n).cache()
>>>>>>>
>>>>>>> followed by
>>>>>>>
>>>>>>>     rddA.count()
>>>>>>>     rddB.count()
>>>>>>>
>>>>>>> then joinedRDD = rddA.join(rddB)
>>>>>>>
>>>>>>> I thought that the count() would force the evaluation, so any
>>>>>>> subsequent
>>>>>>> joins would be shuffleless. I was wrong about the shuffle amounts
>>>>>>> however.
>>>>>>> The shuffle write is actually 2GB (i.e. the size of the bigger RDD)
>>>>>>> whil
>>>>>>> there is no Shuffle read. A joinedRdd.count() does a shuffle read of
>>>>>>> about
>>>>>>> 1GB in size, though.
>>>>>>>
>>>>>>> The getPartitions-method does not exist on the resulting RDD (I am
>>>>>>> using
>>>>>>> the Python API). There is however foreachPartition(). What is the
>>>>>>> line
>>>>>>>
>>>>>>>     joinedRdd.getPartitions.foreach{println}
>>>>>>>
>>>>>>> supposed to do?
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Karlson
>>>>>>>
>>>>>>> PS: Sorry for sending this twice, I accidentally did not reply to the
>>>>>>> mailing list first.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 2015-02-12 16:48, Imran Rashid wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi Karlson,
>>>>>>>>
>>>>>>>> I think your assumptions are correct -- that join alone shouldn't
>>>>>>>> require
>>>>>>>> any shuffling.  But its possible you are getting tripped up by lazy
>>>>>>>> evaluation of RDDs.  After you do your partitionBy, are you sure
>>>>>>>> those
>>>>>>>> RDDs
>>>>>>>> are actually materialized & cached somewhere?  eg., if you just did
>>>>>>>> this:
>>>>>>>>
>>>>>>>> val rddA = someData.partitionBy(N)
>>>>>>>> val rddB = someOtherData.partitionBy(N)
>>>>>>>> val joinedRdd = rddA.join(rddB)
>>>>>>>> joinedRdd.count() //or any other action
>>>>>>>>
>>>>>>>> then the partitioning isn't actually getting run until you do the
>>>>>>>> join.
>>>>>>>> So
>>>>>>>> though the join itself can happen without partitioning,
>>>>>>>> joinedRdd.count()
>>>>>>>> will trigger the evaluation of rddA & rddB which will require
>>>>>>>> shuffles.
>>>>>>>> Note that even if you have some intervening action on rddA & rddB
>>>>>>>> that
>>>>>>>> shuffles them, unless you persist the result, you will need to
>>>>>>>> reshuffle
>>>>>>>> them for the join.
>>>>>>>>
>>>>>>>> If this doesn't help explain things, for debugging
>>>>>>>>
>>>>>>>> joinedRdd.getPartitions.foreach{println}
>>>>>>>>
>>>>>>>> this is getting into the weeds, but at least this will tell us
>>>>>>>> whether
>>>>>>>> or
>>>>>>>> not you are getting narrow dependencies, which would avoid the
>>>>>>>> shuffle.
>>>>>>>>  (Does anyone know of a simpler way to check this?)
>>>>>>>>
>>>>>>>> hope this helps,
>>>>>>>> Imran
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> using Pyspark, I create two RDDs (one with about 2M records
>>>>>>>>> (~200MB),
>>>>>>>>> the
>>>>>>>>> other with about 8M records (~2GB)) of the format (key, value).
>>>>>>>>>
>>>>>>>>> I've done a partitionBy(num_partitions) on both RDDs and verified
>>>>>>>>> that
>>>>>>>>> both RDDs have the same number of partitions and that equal keys
>>>>>>>>> reside
>>>>>>>>> on
>>>>>>>>> the same partition (via mapPartitionsWithIndex).
>>>>>>>>>
>>>>>>>>> Now I'd expect that for a join on the two RDDs no shuffling is
>>>>>>>>> necessary.
>>>>>>>>> Looking at the Web UI under http://driver:4040 however reveals that
>>>>>>>>> that
>>>>>>>>> assumption is false.
>>>>>>>>>
>>>>>>>>> In fact I am seeing shuffle writes of about 200MB and reads of
>>>>>>>>> about
>>>>>>>>> 50MB.
>>>>>>>>>
>>>>>>>>> What's the explanation for that behaviour? Where am I wrong with my
>>>>>>>>> assumption?
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>>
>>>>>>>>> Karlson
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to