I've looked in more detail at the current two MPI_Alltoallv algorithms
and wanted to raise a couple of ideas.
Firstly, the new default "pairwise" algorithm.
* There is no optimisation for sparse/empty messages, compare to the old
basic "linear" algorithm.
* The attached "pairwise-nop" patch adds this optimisation and on the
test case I first described in this thread (1000's of small, sparse,
all-to-all), this cuts runtime by approximately 30%
* I think the upper bound on the loop counter for pairwise exchange is
off-by-one. As the comment notes "starting from 1 since local exhange
[sic] is done"; but when step = (size + 1), the sendto/recvfrom both
reduce to rank (self-exchange is already handled in earlier code)
The pairwise algorithm still kills performance on my gigabit ethernet
network. My message transmission time must be small compared to latency,
and the forced MPI_Comm_size() synchronisation steps introduce a minimum
delay (single_link_latency * comm_size), i.e. latency scale linearly
with comm_size. The linear algorithm doesn't wait for each exchange, so
its minimum latency is just a single transmit/receive.
Which brings me to the second idea. The problem with the existing
implementation of the linear algorithm is that the irecv/isend pattern
was identical on all processes, meaning that every process starts by
having to wait for process 0 to send to everyone and every process can
finish waiting for rank (size-1) to send to everyone.
It seems preferable to at least post the send/recv requests in the same
order as the pairwise algorithm. The attached "linear-alltoallv" patch
implements this and, on my test case, shows some modest 5% improvement.
I was wondering if it would address the concerns which led to the switch
of default algorithm.
Simon
diff -r '--exclude=*~' -u openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_alltoallv.c openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_alltoallv.c
--- openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_alltoallv.c 2012-04-03 15:30:17.000000000 +0100
+++ openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_alltoallv.c 2013-01-24 15:12:13.299568308 +0000
@@ -70,7 +70,7 @@
}
/* Perform pairwise exchange starting from 1 since local exhange is done */
- for (step = 1; step < size + 1; step++) {
+ for (step = 1; step < size; step++) {
/* Determine sender and receiver for this step. */
sendto = (rank + step) % size;
diff -r '--exclude=*~' -u openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_util.c openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_util.c
--- openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_util.c 2012-04-03 15:30:17.000000000 +0100
+++ openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_util.c 2013-01-24 15:11:56.562118400 +0000
@@ -37,25 +37,31 @@
ompi_status_public_t* status )
{ /* post receive first, then send, then waitall... should be fast (I hope) */
- int err, line = 0;
+ int err, line = 0, nreq = 0;
ompi_request_t* reqs[2];
ompi_status_public_t statuses[2];
- /* post new irecv */
- err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
- comm, &reqs[0]));
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
-
- /* send data to children */
- err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag,
- MCA_PML_BASE_SEND_STANDARD, comm, &reqs[1]));
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ if (0 != rcount) {
+ /* post new irecv */
+ err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
+ comm, &reqs[nreq++]));
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ }
- err = ompi_request_wait_all( 2, reqs, statuses );
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler_waitall; }
+ if (0 != scount) {
+ /* send data to children */
+ err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag,
+ MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreq++]));
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ }
- if (MPI_STATUS_IGNORE != status) {
- *status = statuses[0];
+ if (0 != nreq) {
+ err = ompi_request_wait_all( nreq, reqs, statuses );
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler_waitall; }
+
+ if (MPI_STATUS_IGNORE != status) {
+ *status = statuses[0];
+ }
}
return (MPI_SUCCESS);
@@ -68,7 +74,7 @@
if( MPI_ERR_IN_STATUS == err ) {
/* At least we know he error was detected during the wait_all */
int err_index = 0;
- if( MPI_SUCCESS != statuses[1].MPI_ERROR ) {
+ if( nreq > 1 && MPI_SUCCESS != statuses[1].MPI_ERROR ) {
err_index = 1;
}
if (MPI_STATUS_IGNORE != status) {
@@ -107,25 +113,31 @@
ompi_status_public_t* status )
{ /* post receive first, then [local] sync send, then wait... should be fast (I hope) */
- int err, line = 0;
+ int err, line = 0, nreq = 0;
ompi_request_t* req[2];
ompi_status_public_t statuses[2];
- /* post new irecv */
- err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
- comm, &(req[0])));
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
-
- /* send data to children */
- err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag,
- MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &(req[1])));
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ if (0 != rcount) {
+ /* post new irecv */
+ err = MCA_PML_CALL(irecv( recvbuf, rcount, rdatatype, source, rtag,
+ comm, &(req[nreq++])));
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ }
+
+ if (0 != scount) {
+ /* send data to children */
+ err = MCA_PML_CALL(isend( sendbuf, scount, sdatatype, dest, stag,
+ MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &(req[nreq++])));
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ }
- err = ompi_request_wait_all( 2, req, statuses );
- if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
+ if (0 != nreq) {
+ err = ompi_request_wait_all( nreq, req, statuses );
+ if (err != MPI_SUCCESS) { line = __LINE__; goto error_handler; }
- if (MPI_STATUS_IGNORE != status) {
- *status = statuses[0];
+ if (MPI_STATUS_IGNORE != status) {
+ *status = statuses[0];
+ }
}
return (MPI_SUCCESS);
@@ -137,7 +149,7 @@
*/
if( MPI_ERR_IN_STATUS == err ) {
int err_index = 0;
- if( MPI_SUCCESS != statuses[1].MPI_ERROR ) {
+ if( nreq > 1 && MPI_SUCCESS != statuses[1].MPI_ERROR ) {
err_index = 1;
}
if (MPI_STATUS_IGNORE != status) {
diff -r '--exclude=*~' -u openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_alltoallv.c openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_alltoallv.c
--- openmpi-1.6.3/ompi/mca/coll/tuned/coll_tuned_alltoallv.c 2012-04-03 15:30:17.000000000 +0100
+++ openmpi-1.6.3.patched/ompi/mca/coll/tuned/coll_tuned_alltoallv.c 2013-01-24 15:13:25.838528015 +0000
@@ -118,6 +118,7 @@
int i, size, rank, err;
char *psnd, *prcv;
int nreqs;
+ int sendto, recvfrom;
ptrdiff_t sext, rext;
MPI_Request *preq;
mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
@@ -154,13 +155,14 @@
/* Post all receives first */
for (i = 0; i < size; ++i) {
- if (i == rank || 0 == rcounts[i]) {
+ recvfrom = (rank + size - i) % size;
+ if (recvfrom == rank || 0 == rcounts[recvfrom]) {
continue;
}
- prcv = ((char *) rbuf) + (rdisps[i] * rext);
- err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
- i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
+ prcv = ((char *) rbuf) + (rdisps[recvfrom] * rext);
+ err = MCA_PML_CALL(irecv_init(prcv, rcounts[recvfrom], rdtype,
+ recvfrom, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
preq++));
++nreqs;
if (MPI_SUCCESS != err) {
@@ -171,13 +173,14 @@
/* Now post all sends */
for (i = 0; i < size; ++i) {
- if (i == rank || 0 == scounts[i]) {
+ sendto = (rank + i) % size;
+ if (sendto == rank || 0 == scounts[sendto]) {
continue;
}
- psnd = ((char *) sbuf) + (sdisps[i] * sext);
- err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
- i, MCA_COLL_BASE_TAG_ALLTOALLV,
+ psnd = ((char *) sbuf) + (sdisps[sendto] * sext);
+ err = MCA_PML_CALL(isend_init(psnd, scounts[sendto], sdtype,
+ sendto, MCA_COLL_BASE_TAG_ALLTOALLV,
MCA_PML_BASE_SEND_STANDARD, comm,
preq++));
++nreqs;