I'm been playing with come code to try and become familiar with
MPI_Comm_accept and MPI_Comm_connect to implement an MPI
client/server. The code that I have simply sends a single MPI_INT,
the client process pid, to the server and then disconnects. The code
that I have works for a few test runs but then on the 2nd or 3rd
client connection, the server seems to stop responding and the client
spins 100% CPU in the call to MPI_Comm_accept. Am I doing something
wrong in my code? Thanks in advance for any help. First, an example
run ...
In terminal #1, start the name service
$ ompi-server -r ${PREFIX}/var/run/ompi-server/uri.txt
$
In terminal #2, start the server code
$ mpirun -mca btl tcp,sm,self \
--ompi-server file:${PREFIX}/var/run/ompi-server/uri.txt mpi-server
mpi-server pid 41556
Opened port 2011758592.0;tcp://10.161.1.73:51113+2011758593.0;\
tcp://10.161.1.73:51114:300
MPI_Info_set("ompi_global_scope", "true")
Published {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Waiting to accept a connection on {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Accepted a connection on {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"} \
with rank 0 and size 1
We detached from the client thread...
Waiting for an int...
Waiting to accept a connection on {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Received 41558
Disconnecting intercomm
Accepted a connection on {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"} \
with rank 0 and size 1
We detached from the client thread...
Waiting to accept a connection on {"mpi-server-example", "Waiting for an int...
2011758592.0;tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Received 41560
Disconnecting intercomm
^Cmpirun: killing job...
In terminal #2, the client runs
$ mpirun -mca btl tcp,sm,self \
--ompi-server file:${PREFIX}/var/run/ompi-server/uri.txt mpi-client
mpi-client pid 41558
Looking up port for service "mpi-server-example
Found {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Connected to {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"} \
with rank 0 and size 1
Sending to {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Disconnecting intercomm
$ mpirun -mca btl tcp,sm,self \
--ompi-server file:${PREFIX}/var/run/ompi-server/uri.txt mpi-client
mpi-client pid 41560
Looking up port for service "mpi-server-example
Found {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Connected to {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"} \
with rank 0 and size 1
Sending to {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
Disconnecting intercomm
$ mpirun -mca btl tcp,sm,self \
--ompi-server file:${PREFIX}/var/run/ompi-server/uri.txt mpi-client
mpi-client pid 41562
Looking up port for service "mpi-server-example
Found {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
^Cmpirun: killing job...
$ mpirun -mca btl tcp,sm,self \
--ompi-server file:${PREFIX}/var/run/ompi-server/uri.txt mpi-client
mpi-client pid 41564
Looking up port for service "mpi-server-example
Found {"mpi-server-example", "2011758592.0;\
tcp://10.161.1.73:51113+2011758593.0;tcp://10.161.1.73:51114:300"}
^Cmpirun: killing job...
The source code for the server is as follows
$ cat src/mpi-server.cc
#include "seed/mpi_info.hh"
#include "seed/mpi_intercomm.hh"
#include "seed/scope_exit.hh"
#include <mpi.h>
#include <array>
#include <cstdlib>
#include <iostream>
#include <ostream>
#include <string>
#include <thread>
int main(int argc, char* argv[])
{
std::clog << argv[0] << " pid " << getpid() << '\n';
shor::seed::Scope_exit finalize(
[]() {
if (MPI::Is_initialized()) {
MPI::Finalize();
}
});
const auto required = MPI_THREAD_MULTIPLE;
const auto provided = MPI::Init_thread(argc, argv, required);
if (provided < required) {
std::cerr << "Error: could not init with MPI_THREAD_MULTIPLE\n";
return EXIT_FAILURE;
}
typedef std::array<char, MPI_MAX_PORT_NAME> Port_name;
Port_name port_name;
MPI::Open_port(MPI_INFO_NULL, port_name.data());
shor::seed::Scope_exit close_port(
[port_name]() {
std::clog << "Closing port " << port_name.data() << '\n';
MPI::Close_port(port_name.data());
});
std::clog << "Opened port " << port_name.data() << '\n';
const std::string service_name = "mpi-server-example";
const shor::seed::Mpi_info pub_info({{"ompi_global_scope", "true"}});
MPI::Publish_name(service_name.c_str(), pub_info, port_name.data());
shor::seed::Scope_exit unpublish_name(
[port_name, service_name, &pub_info]() {
std::clog
<< "Unpublishing {\"" << service_name
<< "\", \"" << port_name.data() << "\"}\n";
MPI::Unpublish_name(
service_name.c_str(), pub_info, port_name.data());
});
std::clog
<< "Published {\"" << service_name << "\", \"" << port_name.data()
<< "\"}\n";
while (true) {
std::clog
<< "Waiting to accept a connection on {\"" << service_name
<< "\", \"" << port_name.data() << "\"}\n";
shor::seed::Mpi_intercomm comm(
MPI::COMM_SELF.Accept(port_name.data(), MPI_INFO_NULL, 0));
std::clog
<< "Accepted a connection on {\"" << service_name
<< "\", \"" << port_name.data() << "\"} with rank "
<< comm->Get_rank() << " and size " << comm->Get_size()
<< '\n';
const auto lambda = [](shor::seed::Mpi_intercomm comm) {
int n;
std::clog << "Waiting for an int...\n";
comm->Recv(&n, 1, MPI_INT, MPI_ANY_SOURCE, 0);
std::clog << "Received " << n << '\n';
};
std::thread a_thread(lambda, std::move(comm));
a_thread.detach();
std::clog << "We detached from the client thread...\n";
}
}
$
The client
$ cat src/mpi-client.cc
#include "seed/mpi_intercomm.hh"
#include "seed/scope_exit.hh"
#include <mpi.h>
#include <array>
#include <iostream>
#include <ostream>
#include <string>
#include <unistd.h>
int main(int argc, char* argv[])
{
std::clog << argv[0] << " pid " << getpid() << '\n';
shor::seed::Scope_exit finalize(
[]() {
if (MPI::Is_initialized()) {
MPI::Finalize();
}
});
const auto required = MPI_THREAD_MULTIPLE;
const auto provided = MPI::Init_thread(argc, argv, required);
if (provided < required) {
std::cerr << "Error: could not init with MPI_THREAD_MULTIPLE\n";
return EXIT_FAILURE;
}
const std::string service_name = "mpi-server-example";
std::clog
<< "Looking up port for service \"" << service_name << '\n';
std::array<char, MPI_MAX_PORT_NAME> port_name;
MPI::Lookup_name(
service_name.c_str(), MPI_INFO_NULL, port_name.data());
std::clog
<< "Found {\"" << service_name << "\", \"" << port_name.data()
<< "\"}\n";
shor::seed::Mpi_intercomm comm(
MPI::COMM_SELF.Connect(port_name.data(), MPI_INFO_NULL, 0));
std::clog
<< "Connected to {\"" << service_name << "\", \""
<< port_name.data() << "\"} with rank " << comm->Get_rank()
<< " and size " << comm->Get_size() << '\n';
std::clog
<< "Sending to {\"" << service_name << "\", \""
<< port_name.data() << "\"}\n";
const int n = getpid();
comm->Send(&n, 1, MPI_INT, 0, 0);
}
$
And a few helpers used by both
$ cat include/seed/mpi_intercomm.hh
#ifndef INCLUDE_SEED_MPI_INTERCOMM_HH
#define INCLUDE_SEED_MPI_INTERCOMM_HH
#include <mpi.h>
#include <boost/optional.hpp>
namespace shor {
namespace seed {
class Mpi_intercomm {
boost::optional<MPI::Intercomm> impl_;
public:
explicit Mpi_intercomm(MPI::Intercomm impl);
Mpi_intercomm(const Mpi_intercomm& that) = delete;
Mpi_intercomm(Mpi_intercomm&& that);
~Mpi_intercomm();
Mpi_intercomm& operator = (const Mpi_intercomm& that) = delete;
Mpi_intercomm& operator = (Mpi_intercomm&& that);
MPI::Intercomm* operator -> ();
const MPI::Intercomm* operator -> () const;
operator MPI::Intercomm() const;
};
} // namespace seed
} // namespace shor
#include "seed/mpi_intercomm.cc.hh"
#endif
$ cat include/seed/mpi_intercomm.cc.hh
#ifndef INCLUDE_SEED_MPI_INTERCOMM_CC_HH
#define INCLUDE_SEED_MPI_INTERCOMM_CC_HH
#include <mpi.h>
inline MPI::Intercomm*
shor::seed::Mpi_intercomm::operator -> ()
{
return &(*impl_);
}
inline const MPI::Intercomm*
shor::seed::Mpi_intercomm::operator -> () const
{
return &(*impl_);
}
inline
shor::seed::Mpi_intercomm::operator MPI::Intercomm() const
{
return *impl_;
}
#endif
$ cat src/mpi_intercomm.cc
#include "seed/mpi_intercomm.hh"
shor::seed::Mpi_intercomm::Mpi_intercomm(
MPI::Intercomm impl)
: impl_(impl)
{ }
shor::seed::Mpi_intercomm::Mpi_intercomm(
Mpi_intercomm&& that)
: impl_(that.impl_)
{
that.impl_ = boost::none;
}
shor::seed::Mpi_intercomm::~Mpi_intercomm()
{
if (impl_
&& (*impl_ != MPI::COMM_WORLD) && (*impl_ != MPI::COMM_SELF))
{
std::clog << "Disconnecting intercomm\n";
impl_->Disconnect();
impl_ = boost::none;
}
}
shor::seed::Mpi_intercomm&
shor::seed::Mpi_intercomm::operator = (
Mpi_intercomm&& that)
{
impl_ = that.impl_;
that.impl_ = boost::none;
}
$ cat include/seed/mpi_info.hh
#ifndef INCLUDE_SEED_MPI_INFO_HH
#define INCLUDE_SEED_MPI_INFO_HH
#include <mpi.h>
#include <initializer_list>
namespace shor {
namespace seed {
class Mpi_info {
MPI::Info impl_;
public:
typedef std::pair<const char*, const char*> Key_value;
typedef std::initializer_list<Key_value> Init_list;
Mpi_info();
explicit Mpi_info(const Init_list& some_values);
Mpi_info(const Mpi_info& that) = delete;
Mpi_info(Mpi_info&&);
~Mpi_info();
Mpi_info& operator = (const Mpi_info& that) = delete;
Mpi_info& operator = (Mpi_info&& that);
operator MPI::Info() const;
};
} // namespace seed
} // namespace shor
#include "seed/mpi_info.cc.hh"
#endif
$ cat include/seed/mpi_info.cc.hh
#ifndef INCLUDE_SEED_MPI_INFO_CC_HH
#define INCLUDE_SEED_MPI_INFO_CC_HH
#include "seed/mpi_info.hh"
inline shor::seed::Mpi_info::operator MPI::Info() const
{
return impl_;
}
#endif
$ cat src/mpi_info.cc
#include "seed/mpi_info.hh"
#include <algorithm>
#include <initializer_list>
shor::seed::Mpi_info::Mpi_info()
: impl_(MPI::Info::Create())
{ }
shor::seed::Mpi_info::Mpi_info(
const Init_list& some_values)
: impl_(MPI::Info::Create())
{
std::for_each(
std::begin(some_values), std::end(some_values),
[this] (const Key_value& one_value) {
std::clog
<< "MPI_Info_set(\"" << std::get<0>(one_value)
<< "\", \"" << std::get<1>(one_value)
<< "\")\n";
impl_.Set(std::get<0>(one_value), std::get<1>(one_value));
});
}
shor::seed::Mpi_info::Mpi_info(Mpi_info&& that)
: impl_(that.impl_)
{ }
shor::seed::Mpi_info::~Mpi_info()
{
impl_.Free();
}
shor::seed::Mpi_info&
shor::seed::Mpi_info::operator = (Mpi_info&& that)
{
impl_ = that.impl_;
return *this;
}
$ cat include/seed/scope_exit.hh
#ifndef INCLUDE_SEED_SCOPE_EXIT_HH
#define INCLUDE_SEED_SCOPE_EXIT_HH
#include <functional>
namespace shor {
namespace seed {
class Scope_exit {
std::function<void()> lambda_;
public:
Scope_exit(std::function<void()> lambda) : lambda_(lambda) { }
Scope_exit(const Scope_exit& that) = delete;
~Scope_exit() { lambda_(); }
Scope_exit& operator = (const Scope_exit& that) = delete;
};
} // namespace seed
} // namespace shor
#endif
$
And here is the output of ompi_info
$ ompi_info
Package: Open MPI [email protected]
Distribution
Open MPI: 1.6.4
Open MPI SVN revision: r28081
Open MPI release date: Feb 19, 2013
Open RTE: 1.6.4
Open RTE SVN revision: r28081
Open RTE release date: Feb 19, 2013
OPAL: 1.6.4
OPAL SVN revision: r28081
OPAL release date: Feb 19, 2013
MPI API: 2.1
Ident string: 1.6.4
Prefix: ${PREFIX}
Configured architecture: x86_64-apple-darwin12.3.0
Configure host: Damien-Kicks-MacBook-Pro.local
Configured by: dkick
Configured on: Thu May 9 21:36:29 CDT 2013
Configure host: Damien-Kicks-MacBook-Pro.local
Built by: dkick
Built on: Thu May 9 21:53:32 CDT 2013
Built host: Damien-Kicks-MacBook-Pro.local
C bindings: yes
C++ bindings: yes
Fortran77 bindings: yes (single underscore)
Fortran90 bindings: yes
Fortran90 bindings size: small
C compiler: gcc
C compiler absolute: /usr/bin/gcc
C compiler family name: GNU
C compiler version: 4.8.0
C++ compiler: g++ --std=c++0x
C++ compiler absolute: /usr/bin/g++
Fortran77 compiler: gfortran
Fortran77 compiler abs: /sw/bin/gfortran
Fortran90 compiler: gfortran
Fortran90 compiler abs: /sw/bin/gfortran
C profiling: yes
C++ profiling: yes
Fortran77 profiling: yes
Fortran90 profiling: yes
C++ exceptions: yes
Thread support: posix (MPI_THREAD_MULTIPLE: yes, progress: no)
Sparse Groups: no
Internal debug support: no
MPI interface warnings: no
MPI parameter check: runtime
Memory profiling support: no
Memory debugging support: no
libltdl support: yes
Heterogeneous support: no
mpirun default --prefix: no
MPI I/O support: yes
MPI_WTIME support: gettimeofday
Symbol vis. support: yes
Host topology support: yes
MPI extensions: affinity example
FT Checkpoint support: no (checkpoint thread: no)
VampirTrace support: yes
MPI_MAX_PROCESSOR_NAME: 256
MPI_MAX_ERROR_STRING: 256
MPI_MAX_OBJECT_NAME: 64
MPI_MAX_INFO_KEY: 36
MPI_MAX_INFO_VAL: 256
MPI_MAX_PORT_NAME: 1024
MPI_MAX_DATAREP_STRING: 128
MCA backtrace: execinfo (MCA v2.0, API v2.0, Component v1.6.4)
MCA paffinity: hwloc (MCA v2.0, API v2.0, Component v1.6.4)
MCA carto: auto_detect (MCA v2.0, API v2.0, Component v1.6.4)
MCA carto: file (MCA v2.0, API v2.0, Component v1.6.4)
MCA shmem: mmap (MCA v2.0, API v2.0, Component v1.6.4)
MCA shmem: posix (MCA v2.0, API v2.0, Component v1.6.4)
MCA shmem: sysv (MCA v2.0, API v2.0, Component v1.6.4)
MCA maffinity: first_use (MCA v2.0, API v2.0, Component v1.6.4)
MCA maffinity: hwloc (MCA v2.0, API v2.0, Component v1.6.4)
MCA timer: darwin (MCA v2.0, API v2.0, Component v1.6.4)
MCA installdirs: env (MCA v2.0, API v2.0, Component v1.6.4)
MCA installdirs: config (MCA v2.0, API v2.0, Component v1.6.4)
MCA sysinfo: darwin (MCA v2.0, API v2.0, Component v1.6.4)
MCA hwloc: hwloc132 (MCA v2.0, API v2.0, Component v1.6.4)
MCA dpm: orte (MCA v2.0, API v2.0, Component v1.6.4)
MCA pubsub: orte (MCA v2.0, API v2.0, Component v1.6.4)
MCA allocator: basic (MCA v2.0, API v2.0, Component v1.6.4)
MCA allocator: bucket (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: basic (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: hierarch (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: inter (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: self (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: sm (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: sync (MCA v2.0, API v2.0, Component v1.6.4)
MCA coll: tuned (MCA v2.0, API v2.0, Component v1.6.4)
MCA io: romio (MCA v2.0, API v2.0, Component v1.6.4)
MCA mpool: fake (MCA v2.0, API v2.0, Component v1.6.4)
MCA mpool: rdma (MCA v2.0, API v2.0, Component v1.6.4)
MCA mpool: sm (MCA v2.0, API v2.0, Component v1.6.4)
MCA pml: bfo (MCA v2.0, API v2.0, Component v1.6.4)
MCA pml: csum (MCA v2.0, API v2.0, Component v1.6.4)
MCA pml: ob1 (MCA v2.0, API v2.0, Component v1.6.4)
MCA pml: v (MCA v2.0, API v2.0, Component v1.6.4)
MCA bml: r2 (MCA v2.0, API v2.0, Component v1.6.4)
MCA rcache: vma (MCA v2.0, API v2.0, Component v1.6.4)
MCA btl: self (MCA v2.0, API v2.0, Component v1.6.4)
MCA btl: sm (MCA v2.0, API v2.0, Component v1.6.4)
MCA btl: tcp (MCA v2.0, API v2.0, Component v1.6.4)
MCA topo: unity (MCA v2.0, API v2.0, Component v1.6.4)
MCA osc: pt2pt (MCA v2.0, API v2.0, Component v1.6.4)
MCA osc: rdma (MCA v2.0, API v2.0, Component v1.6.4)
MCA iof: hnp (MCA v2.0, API v2.0, Component v1.6.4)
MCA iof: orted (MCA v2.0, API v2.0, Component v1.6.4)
MCA iof: tool (MCA v2.0, API v2.0, Component v1.6.4)
MCA oob: tcp (MCA v2.0, API v2.0, Component v1.6.4)
MCA odls: default (MCA v2.0, API v2.0, Component v1.6.4)
MCA ras: cm (MCA v2.0, API v2.0, Component v1.6.4)
MCA ras: slurm (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: load_balance (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: rank_file (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: resilient (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: round_robin (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: seq (MCA v2.0, API v2.0, Component v1.6.4)
MCA rmaps: topo (MCA v2.0, API v2.0, Component v1.6.4)
MCA rml: oob (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: binomial (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: cm (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: direct (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: linear (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: radix (MCA v2.0, API v2.0, Component v1.6.4)
MCA routed: slave (MCA v2.0, API v2.0, Component v1.6.4)
MCA plm: rsh (MCA v2.0, API v2.0, Component v1.6.4)
MCA plm: slurm (MCA v2.0, API v2.0, Component v1.6.4)
MCA filem: rsh (MCA v2.0, API v2.0, Component v1.6.4)
MCA errmgr: default (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: env (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: hnp (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: singleton (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: slave (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: slurm (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: slurmd (MCA v2.0, API v2.0, Component v1.6.4)
MCA ess: tool (MCA v2.0, API v2.0, Component v1.6.4)
MCA grpcomm: bad (MCA v2.0, API v2.0, Component v1.6.4)
MCA grpcomm: basic (MCA v2.0, API v2.0, Component v1.6.4)
MCA grpcomm: hier (MCA v2.0, API v2.0, Component v1.6.4)
MCA notifier: command (MCA v2.0, API v1.0, Component v1.6.4)
MCA notifier: syslog (MCA v2.0, API v1.0, Component v1.6.4)
$
________________________________
This e-mail and any attachments are confidential. If it is not intended for
you, please notify the sender, and please erase and ignore the contents.