Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has
[reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk
Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been
provided [here][2].
[1]:
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code, beginning with
`parLapplyLB`:
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
**clusterApplyLB**
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few examples.
We were using lists of 100 elements and 5
workers. First, lets take a look at **splitList**:
```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
> sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
> sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as possible.
## dynamicClusterApply
**dynamicClusterApply** works this way (simplified):
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next chunk
**This is the important part:** As long as there are **more** chunks than
workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk**
and there is **no** load balancing.
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of refactoring,
for readability):
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
For our examples, the chunks have these sizes:
```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution, there can't
possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are
no more chunks.
Instead, **parLapplyLB** should look like this (patch is attached):
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
Examples with a cluster of 5 workers:
```r
# length(cl) < length(X)
> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
> sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of workers, if
possible at all, and then load balancing
should work.
Best Regards
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------
Phone: +49 341 97 33144
Email: [email protected]
------------------------------------------------------------------------------------------------------------------------
German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------
iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92
Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der
Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation
mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte
Kooperationspartner sind die folgenden
außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für
Umweltforschung GmbH - UFZ, das
Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für
chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das
Leibniz-Institut Deutsche Sammlung von Mikroorganismen
und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das
Leibniz-Institut für Pflanzengenetik und
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für
Naturkunde Görlitz (SMNG). USt-IdNr. DE
141510383
Index: src/library/parallel/R/clusterApply.R
===================================================================
--- src/library/parallel/R/clusterApply.R (revision 74246)
+++ src/library/parallel/R/clusterApply.R (working copy)
@@ -177,9 +177,13 @@
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
+
+ chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
+
+ chunks <- splitList(X, chunkSize)
+
do.call(c,
- clusterApplyLB(cl, x = splitList(X, length(cl)),
- fun = lapply, fun, ...),
+ clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
______________________________________________
[email protected] mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel