Le 27/04/2016 à 19:41, Joseph Bradley a écrit :
Do you have code which can reproduce this performance drop in
treeReduce? It would be helpful to debug. In the 1.6 release, we
profiled it via the various MLlib algorithms and did not see
performance drops.
That would be difficult, but if we cannot find out, we'll design a small
example to test that. I first have to check with latest git version. I
have to recompile spark with lgpl version of netlib.
It's not just renumbering the partitions; it is reducing the number of
partitions by a factor of 1.0/scale (where scale > 1). This creates a
"tree"-structured aggregation so that more of the work of merging
during aggregation is done on the workers, not the driver.
Sure,I get that, and it wasn't my point. I just think coalesce also
reduces the number of partitions, without shuffle, right ?
_With Coalesce :_
Let's say we have 2 workers with 2 partitions each.
W0: p0,p1
W1: p1,p2
Since coalesce tries to reduce shuffling, coalesce(2) should group
contents of p0 and p1 in p0' (on W0) and p2 and p3 in p1' (-on W1)
OTOH, _with current mapPartitionWithIndex + modulo + reduceByKey_, let's
say partitions are numbered like that :
(0,p0),(1,p1),(2,p2),(3,p3)
Then after the modulo, (0,p0),(1,p1),(0,p2),(1,p3)
As a consequence, W1 will shuffle p2 to W0 and W0 will shuffle p1 to W1.
Guillaume
On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel
<guillaume.pi...@exensa.com <mailto:guillaume.pi...@exensa.com>> wrote:
Hi,
I've been looking at the code of RDD.treeAggregate, because we've
seen a huge performance drop between 1.5.2 and 1.6.1 on a
treeReduce. I think the treeAggregate code hasn't changed, so my
message is not about the performance drop, but a more general
remark about treeAggregate.
In treeAggregate, after the aggregate is applied inside original
partitions, we enter the tree :
while (numPartitions > scale + math.ceil(numPartitions.toDouble /
scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
*partiallyAggregated
**=**partiallyAggregated.mapPartitionsWithIndex {*
*(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*
}.reduceByKey(new HashPartitioner(curNumPartitions),
cleanCombOp).values
}
The two lines where the partitions are numbered then renumbered,
then reducedByKey seems below optimality to me. There is a huge
shuffle cost, while a simple coalesce followed by a
partition-level aggregation would probably perfectly do the job.
Have I missed something that requires to do this reshuffle ?
Best regards
Guillaume Pitel
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705