Le 27/04/2016 à 19:41, Joseph Bradley a écrit :
Do you have code which can reproduce this performance drop in treeReduce? It would be helpful to debug. In the 1.6 release, we profiled it via the various MLlib algorithms and did not see performance drops.
That would be difficult, but if we cannot find out, we'll design a small example to test that. I first have to check with latest git version. I have to recompile spark with lgpl version of netlib.

It's not just renumbering the partitions; it is reducing the number of partitions by a factor of 1.0/scale (where scale > 1). This creates a "tree"-structured aggregation so that more of the work of merging during aggregation is done on the workers, not the driver.

Sure,I get that, and it wasn't my point. I just think coalesce also reduces the number of partitions, without shuffle, right ?

_With Coalesce :_
Let's say we have 2 workers with 2 partitions each.

W0: p0,p1
W1: p1,p2

Since coalesce tries to reduce shuffling, coalesce(2) should group contents of p0 and p1 in p0' (on W0) and p2 and p3 in p1' (-on W1)

OTOH, _with current mapPartitionWithIndex + modulo + reduceByKey_, let's say partitions are numbered like that :

(0,p0),(1,p1),(2,p2),(3,p3)

Then after the modulo, (0,p0),(1,p1),(0,p2),(1,p3)

As a consequence, W1 will shuffle p2 to W0 and W0 will shuffle p1 to W1.

Guillaume

On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel <guillaume.pi...@exensa.com <mailto:guillaume.pi...@exensa.com>> wrote:

    Hi,

    I've been looking at the code of RDD.treeAggregate, because we've
    seen a huge performance drop between 1.5.2 and 1.6.1 on a
    treeReduce. I think the treeAggregate code hasn't changed, so my
    message is not about the performance drop, but a more general
    remark about treeAggregate.

    In treeAggregate, after the aggregate is applied inside original
    partitions, we enter the tree :


        while (numPartitions > scale + math.ceil(numPartitions.toDouble /
    scale)) {

        numPartitions /= scale

        val curNumPartitions = numPartitions

        *partiallyAggregated
    **=**partiallyAggregated.mapPartitionsWithIndex {*

        *(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*

        }.reduceByKey(new HashPartitioner(curNumPartitions),
    cleanCombOp).values

        }


    The two lines where the partitions are numbered then renumbered,
    then reducedByKey seems below optimality to me. There is a huge
    shuffle cost, while a simple coalesce followed by a
    partition-level aggregation would probably perfectly do the job.

    Have I missed something that requires to do this reshuffle ?

    Best regards
    Guillaume Pitel




--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to