Michael - I mean although preparing and repartitioning the underlying data can't avoid the shuffle introduced by Spark SQL (Yin has explained why), but it does help to reduce network IO.

On 1/21/15 10:01 AM, Yin Huai wrote:
Hello Michael,

In Spark SQL, we have our internal concepts of Output Partitioning (representing the partitioning scheme of an operator's output) and Required Child Distribution (representing the requirement of input data distribution of an operator) for a physical operator. Let's say we have two operators, parent and child, and the parent takes the output of the child as its input. At the end of query planning process, whenever the Output Partitioning of the child does not satisfy the Required Child Distribution of the parent, we will add an Exchange operator between the parent and child to shuffle the data. Right now, we do not record the partitioning scheme of an input table. So, I think even if you use partitionBy (or DISTRIBUTE BY in SQL) to prepare your data, you still will see the Exchange operator and your GROUP BY operation will be executed in a new stage (after the Exchange).

Making Spark SQL aware of the partitioning scheme of input tables is a useful optimization. I just created https://issues.apache.org/jira/browse/SPARK-5354 to track it.

Thanks,

Yin



On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies <[email protected] <mailto:[email protected]>> wrote:

    Hi Cheng,

    Are you saying that by setting up the lineage
    
schemaRdd.keyBy(_.getString(1)).partitionBy(newHashPartitioner(n)).values.applySchema(schema)
    then Spark SQL will know that an SQL “group by” on Customer Code
    will not have to shuffle?

    But the prepared will have already shuffled so we pay an upfront
    cost for future groupings (assuming we cache I suppose)

    Mick

    On 20 Jan 2015, at 20:44, Cheng Lian <[email protected]
    <mailto:[email protected]>> wrote:

    First of all, even if the underlying dataset is partitioned as
    expected, a shuffle can’t be avoided. Because Spark SQL knows
    nothing about the underlying data distribution. However, this
    does reduce network IO.

    You can prepare your data like this (say |CustomerCode| is a
    string field with ordinal 1):

    |val  schemaRdd  =  sql(...)
    val  schema  =  schemaRdd.schema
    val  prepared  =  schemaRdd.keyBy(_.getString(1)).partitionBy(new  
HashPartitioner(n)).values.applySchema(schema)
    |

    |n| should be equal to |spark.sql.shuffle.partitions|.

    Cheng

    On 1/19/15 7:44 AM, Mick Davies wrote:



    Is it possible to use a HashPartioner or something similar to distribute a
    SchemaRDDs data by the hash of a particular column or set of columns.

    Having done this I would then hope that GROUP BY could avoid shuffle

    E.g. set up a HashPartioner on CustomerCode field so that

    SELECT CustomerCode, SUM(Cost)
    FROM Orders
    GROUP BY CustomerCode

    would not need to shuffle.

    Cheers
    Mick





    --
    View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
    Sent from the Apache Spark User List mailing list archive atNabble.com  
<http://Nabble.com>.

    ---------------------------------------------------------------------
    To unsubscribe, e-mail:[email protected]  
<mailto:[email protected]>
    For additional commands, e-mail:[email protected]  
<mailto:[email protected]>





Reply via email to