Hi Loh,
I used MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided); in my
program and got provided = 0 which turns out to be the MPI_THREAD_SINGLE. Does
this mean that I can not use MPI_THREAD_MULTIPLE model? I write a little
program to test the multithreading and it worked sometimes and failed
sometimes. It also hang there sometimes. Does this only because the
MPI_THREAD_MULTIPLE is not supported or there are some bugs in the program? I
attached the little program as follow:
#include <iostream>
#include <pthread.h>
#include <fstream>
#include <sstream>
#include <string.h>
#include "mpi.h"
using namespace std;
#define MSG_QUERY_SIZE 16 //sizeof(MPI_query_msg) = 16
struct MPI_query_msg
{
int flag; // -1:request cell; 0:query coordinate; 1:there is no cell to grant
int x;
int y;
int ignited; // if x,y are not negative, then ignited: 0 is not ignited, 1
is ignited
};
void* backRecv(void* arg)
{
int myid, nprocs;
pthread_mutex_init(&_dealmutex2, NULL);
stringstream RANK;
MPI_Status status;
MPI_Request req2;
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
int right = (myid + 1) % (nprocs - 1);
MPI_query_msg rMSG;
RANK << myid;
cout << myid << " create background message recv" << endl;
int x, y;
//char c;
int m;
int count = 0;
string filename("f_");
filename += RANK.str();
filename += "_backRecv.txt";
fstream fout(filename.c_str(), ios::out);
if(!fout)
{
cout << "can not create the file " << filename << endl;
fout.close();
exit(1);
}
while(true)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222,
MPI_COMM_WORLD, &status);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222,
MPI_COMM_WORLD, &req2);
//MPI_Wait(&req2, &status);
fout << "BACKREV:" << myid << " recv from " << status.MPI_SOURCE << "
rMSG.flag = " << rMSG.flag << " with tag 222" << endl;
fout.flush();
if(rMSG.flag == -1)
{
fout << "*******backRecv FINISHED IN " << myid << "********" << endl;
fout.flush();
fout.close();
pthread_exit(NULL);
return 0;
}
};
}
int main(int argc, char **argv)
{
int myid = 0;
int provided;
int nprocs = 0;
pthread_t pt1 = 0;
pthread_t pt2 = 0;;
int pret1 = 0;
int pret2 = 0;
int i = 0, j = 0, m = 0;
//MPI_Status status;
MPI_Request requ1, requ2;
MPI_Status status1, status2;
MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided);
//MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
pthread_mutex_init(&_dealmutex, NULL);
if(myid == nprocs - 1) // the last one
{
if(provided == MPI_THREAD_MULTIPLE)
{
cout << myid << " got MPI_THREAD_MULTIPLE " << endl;
}
else
{
cout << myid << " MPI_THREAD_MULTIPLE = " << MPI_THREAD_MULTIPLE << endl;
cout << myid << " MPI_THREAD_SINGLE = " << MPI_THREAD_SINGLE << endl;
cout << myid << " got provided = " << provided << endl;
}
MPI_query_msg sMSGqueue[50], rMSG;
for(i=0; i<50; i++)
{
sMSGqueue[i].flag = i;
sMSGqueue[i].x = i;
sMSGqueue[i].y = i;
sMSGqueue[i].ignited = i;
}
while(j < 50)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, &status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, &requ2);
//MPI_Wait(&requ2, &status2);
cout << "MAIN(" << j << "): " << myid << " recvs from "<< status2.MPI_SOURCE
<< " with tag = " << status2.MPI_TAG << " rMSG.flag = " << rMSG.flag << endl;
MPI_Send(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE,
status2.MPI_TAG, MPI_COMM_WORLD);
//MPI_Isend(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE,
status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
cout << "MAIN(" << j << "): " << myid << " sends to "<< status2.MPI_SOURCE
<< " with tag = " << status2.MPI_TAG << " sMSGqueue[j].flag = " <<
sMSGqueue[j].flag << endl;
j++;
};
int count = 0;
while(true)
{
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, &status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, &requ2);
//MPI_Wait(&requ2, &status2);
rMSG.flag = -1;
MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE,
status2.MPI_TAG, MPI_COMM_WORLD);
//MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE,
status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
//MPI_Wait(&requ1, &status1);
cout << "MAIN sends termination to " << status2.MPI_SOURCE << endl;
count++;
if(count == myid)
break;
};
cout << "*******************************MAIN SUCESS!" << endl;
}
else
{
pret1 = pthread_create(&pt1, NULL, backRecv, NULL);
if(pret1 != 0)
{
cout << myid << "backRecv Thread Create Failed." << endl;
exit(1);
}
MPI_query_msg sMSG, rMSG;
rMSG.flag = myid;
rMSG.x = myid;
rMSG.y = myid;
rMSG.ignited = myid;
sMSG.flag = myid;
sMSG.x = myid;
sMSG.y = myid;
sMSG.ignited = myid;
int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
int right = (myid + 1) % (nprocs - 1);
while(true)
{
MPI_Send(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD);
//MPI_Isend(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD,
&requ1);
//MPI_Wait(&requ1, &status1);
cout << "SLAVE: " << myid << " sends to "<< nprocs-1 << " sMSG.x = " <<
sMSG.x << endl;
MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD,
&status2);
//MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD,
&requ2);
//MPI_Wait(&requ2, &status2);
cout << "SLAVE: " << myid << " recvs from "<< nprocs-1 << " rMSG.flag = " <<
rMSG.flag << endl;
MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
//MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD,
&requ1);
//MPI_Wait(&requ1, &status1);
if(rMSG.flag == -1)
{
//MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
break;
}
for(j=0; j<(myid+1)*300; ++j)
{}
};
cout << "*******************************SLAVE" << myid << " SUCESS!" << endl;
pthread_join(pt1, NULL);
}
MPI_Finalize();
}
BTW, if I want to create a background thread which is sort of like a deamon
thread, how can I achieve that in MPI programs? Thanks.
List-Post: [email protected]
Date: Tue, 22 Sep 2009 10:32:50 -0700
From: [email protected]
To: [email protected]
Subject: Re: [OMPI users] How to create multi-thread parallel program using
thread-safe send and recv?
guosong wrote:
Thanks for responding. I used a linux cluster. I think I would like to create a
model that is multithreaded and each thread can make MPI calls. I attached test
code as follow. It has two pthreads and there are MPI calls in both of those
two threads. In the main function, there are also MPI calls. Should I use a
full multithreading?I guess so. It seems like the created threads are expected
to make independent/concurrent message-passing calls. Do read the link I sent.
You need to convert from MPI_Init to MPI_Init_thread(), asking for a
full-multithreaded model and checking that you got it. Also note in main()
that the MPI_Isend() calls should be matched with MPI_Wait() or similar calls.
I guess the parent thread will sit in such calls while the child threads do
their own message passing. Good luck.
_________________________________________________________________
约会说不清地方?来试试微软地图最新msn互动功能!
http://ditu.live.com/?form=TL&swm=1