Hi,

I'm testing the non-blocking collective of OpenMPI-1.8.

I have two nodes with Infiniband to perform allgather on totally 128MB data.

I split the 128MB data into eight pieces, and perform computation and
MPI_Iallgatherv() on one piece of data each iteration, hoping that the
MPI_Iallgatherv() of last iteration can be overlapped with computation of
current iteration. A MPI_Wait() is called at the end of last iteration.

However, the total communication time (including the final wait time) is
similar with that of the traditional blocking MPI_Allgatherv, even slightly
higher.


Following is the test pseudo-code, the source code are attached.

===========================

Using MPI_Allgatherv:

for( i=0; i<8; i++ )
{
  // computation
    mytime( t_begin );
    computation;
    mytime( t_end );
    comp_time += (t_end - t_begin);

  // communication
    t_begin = t_end;
    MPI_Allgatherv();
    mytime( t_end );
    comm_time += (t_end - t_begin);
}
--------------------------------------------

Using MPI_Iallgatherv:

for( i=0; i<8; i++ )
{
  // computation
    mytime( t_begin );
    computation;
    mytime( t_end );
    comp_time += (t_end - t_begin);

  // communication
    t_begin = t_end;
    MPI_Iallgatherv();
    mytime( t_end );
    comm_time += (t_end - t_begin);
}

// wait for non-blocking allgather to complete
mytime( t_begin );
for( i=0; i<8; i++ )
    MPI_Wait;
mytime( t_end );
wait_time = t_end - t_begin;

==============================

The results of Allgatherv is:
[cmy@gnode102 test_nbc]$ /home3/cmy/czh/opt/ompi-1.8/bin/mpirun -n 2 --host
gnode102,gnode103 ./Allgatherv 128 2 | grep time
Computation time  :     8481279 us
Communication time:     319803 us

The results of Iallgatherv is:
[cmy@gnode102 test_nbc]$ /home3/cmy/czh/opt/ompi-1.8/bin/mpirun -n 2 --host
gnode102,gnode103 ./Iallgatherv 128 2 | grep time
Computation time  :     8479177 us
Communication time:     199046 us
Wait time:              139841 us


So, does this mean that current OpenMPI implementation of MPI_Iallgatherv
doesn't support offloading of collective communication to dedicated cores
or network interface?

Best regards,
Zehan
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>


#define NS  8   //  number of segment

struct timeval tv;
#define mytime(time)   do{  \
    gettimeofday(&tv,NULL);  \
    time=(unsigned long)(tv.tv_sec*1000000+tv.tv_usec);    \
}while(0)

int main(int argc, char** argv)
{
    MPI_Init(&argc,&argv);
    
    int size;
    int rank;

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(argc<2) {
        printf("Usage: ./allgather m [n]\n");
        printf("        n=1, represent m KB;");
        printf("        n=2, represent m MB;");
        exit(-1);
    }

    int global_size;    // the amount of data to allgather
    int local_size;    // the amount of data that each process holds
    if(argc >= 2) global_size = atoi(argv[1]);
    if(argc >= 3)
    {
        if(atoi(argv[2])==2) global_size = global_size*1024*1024;   // n=2, xxMB
        if(atoi(argv[2])==1) global_size = global_size*1024;        // n=1, xxKB
    }
    local_size = global_size/size;  // each process holds 1/size of the data

    int * global_buf;   // recvbuf
    int * local_buf;    // sendbuf
    global_buf = (int *) malloc(global_size*sizeof(int));
    local_buf = (int *) malloc(local_size*sizeof(int));     
    memset(global_buf,0,global_size*sizeof(int));
    memset(local_buf,0,local_size*sizeof(int));

    int i,j,k;

    int *recvcnts;  // recvcnts of MPI_Allgatherv
    int *displs;    // displs of MPI_Allgatherv
    recvcnts = (int *) malloc(size*sizeof(int));
    displs = (int*) malloc(size*sizeof(int));
    for(i=0; i<size; i++)
        recvcnts[i] = local_size/NS;    // each time perform MPI_Allgatherv on 1/NS of the data


    unsigned long t_begin,t_end,comp_time,comm_time;    // timers
    comp_time = 0;
    comm_time = 0;

    /************** MAIN LOOP ****************/

    for(k=0;k<NS;k++)  // NS iterations
    {  
        // computation 
        mytime(t_begin);    // begin of computation
        for(i=k*local_size/NS;i<(k+1)*local_size/NS;i++)    // initialize the kth part of local data
        {
            local_buf[i] = rank*local_size + i;
            for(j=0;j<40;j++);
        }
        mytime(t_end);      // end of computation
        comp_time += (t_end - t_begin);

        // communication
        t_begin = t_end;    // begin of communication
        for(i=0;i<size;i++)
            displs[i] = i*local_size + k*local_size/NS;     // calculate the displacement of each process in the recvbuf at kth iteration
        int *offset=local_buf + k*local_size/NS;     // the start address of sendbuf
        MPI_Allgatherv(offset, local_size/NS, MPI_INT, global_buf, recvcnts, displs, MPI_INT, MPI_COMM_WORLD);
        mytime(t_end);      // end of communication
        comm_time += (t_end - t_begin);
    }

    /********************************************/

    int err = 0;
    for(i=0;i<global_size;i++)  // check the recvbuf
        if( global_buf[i] != i ) err++;
    if( err )
        printf("rank[%d] failed\n",rank);
    else
        printf("rank[%d] success\n",rank);

    if(rank==0) // print the final result
    {
        printf("Number of processes:    %d\n",size);
        printf("Computation time  :     %d us\n",comp_time);
        printf("Communication time:     %d us\n",comm_time);
    }
    MPI_Finalize();
}
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>


#define NS  8   //  number of segment

struct timeval tv;
#define mytime(time)   do{  \
    gettimeofday(&tv,NULL);  \
    time=(unsigned long)(tv.tv_sec*1000000+tv.tv_usec);    \
}while(0)

int main(int argc, char** argv)
{
    MPI_Init(&argc,&argv);
    
    int size;
    int rank;

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(argc<2) {
        printf("Usage: ./allgather m [n]\n");
        printf("        n=1, represent m KB;");
        printf("        n=2, represent m MB;");
        exit(-1);
    }

    int global_size;    // the amount of data to allgather
    int local_size;    // the amount of data that each process holds
    if(argc >= 2) global_size = atoi(argv[1]);
    if(argc >= 3)
    {
        if(atoi(argv[2])==2) global_size = global_size*1024*1024;   // n=2, xxMB
        if(atoi(argv[2])==1) global_size = global_size*1024;        // n=1, xxKB
    }
    local_size = global_size/size;

    int * global_buf;   // recvbuf
    int * local_buf;    // sendbuf
    global_buf = (int *) malloc(global_size*sizeof(int));
    local_buf = (int *) malloc(local_size*sizeof(int));
    memset(global_buf,0,global_size*sizeof(int));
    memset(local_buf,0,local_size*sizeof(int));

    int i,j,k;

    int *recvcnts;  // recvcnts of MPI_Iallgatherv
    int *displs;    // displs of MPI_Iallgatherv
    recvcnts = (int *) malloc(size*sizeof(int));
    displs = (int*) malloc(size*sizeof(int));
    for(i=0; i<size; i++)
        recvcnts[i] = local_size/NS;    // each time perform MPI_Iallgatherv on 1/NS of the data

    // request and status
    MPI_Request *request = (MPI_Request*) malloc(NS*sizeof(MPI_Request));
    MPI_Status  *status  = (MPI_Status*)  malloc(NS*sizeof(MPI_Status));

    
    unsigned long t_begin,t_end,comp_time,comm_time,wait_time;  //timers
    comp_time = 0;
    comm_time = 0;
    wait_time = 0;

    /************** MAIN LOOP ****************/

    for(k=0;k<NS;k++)  // NS iterations
    {
        // computation   
        mytime(t_begin);    // begin of computation
        for(i=k*local_size/NS;i<(k+1)*local_size/NS;i++) 
        {
            local_buf[i] = rank*local_size + i; // initializae the kth part of local data
            for(j=0;j<40;j++);
        }
        mytime(t_end);      // end of computation
        comp_time += (t_end - t_begin);

        // communication
        t_begin = t_end;    // begin of communication
        for(i=0;i<size;i++)
            displs[i] = i*local_size + k*local_size/NS;     // calculate the displacement of each process in the recvbuf at kth iteration
        int *offset=local_buf + k*local_size/NS;    // the start address of sendbuf
        MPI_Iallgatherv(offset, local_size/NS, MPI_INT, global_buf, recvcnts, displs, MPI_INT, MPI_COMM_WORLD, &request[k]);
        mytime(t_end);      // end of communication
        comm_time += (t_end - t_begin);
    }
    
    mytime(t_begin);    // begin of wait
    for(k=0;k<NS;k++)
        MPI_Wait(&request[k],&status[k]);
    mytime(t_end);      // end of wait
    wait_time += (t_end - t_begin);
    
    /********************************************/

    int err = 0;
    for(i=0;i<global_size;i++)  // check the recvbuf
        if( global_buf[i] != i ) err++;

    if( err )
        printf("rank[%d] failed\n",rank);
    else
        printf("rank[%d] success\n",rank);

    if(rank==0) // print the final result
    {
        printf("Number of processes:    %d\n",size);
        printf("Computation time  :     %d us\n",comp_time);
        printf("Communication time:     %d us\n",comm_time);
        printf("Wait time:              %d us\n",wait_time);
    }
    MPI_Finalize();
}

Reply via email to