Hi All,
I have written a program where MPI master sends and receives large
amount of data i.e sending from 1KB to 1MB of data.
The amount of data to be sent with each call is different
The program runs well when running with 5 slaves, but when i try to
run the same program with 9 slaves, it gives me
MPI_Recv:MPI_ERR_TRUNCATE: message truncated error.
I am using boost MPI and boost serialization libraries for sending data.
I understand that the internal buffer on the master are overrun in
this case. Is there a way i can increase the buffer sizes ?
Here is the output:
-bash-3.2$ mpirun -np 9 --hostfile hostfile2 --rankfile rankfile2
$BENCHMARKS_ROOT/bin/boost_binomial_LB 10 5000_steps.txt
5000_homo_bytes.txt
Master: Starting Binomial Option Price calculations for American call
option
Master: Current stock price: 110
Master: Strike price: 100
Master: Risk-free rate: 1.05
Master: Volatility (annualized): 0.15
Master: Time (years): 1
Master: Number of calculations: 10
Slave 1:Going to Received Skeleton: 1
Slave 1:Received Skeleton: 1
Slave 1:Gpoing to Received Payload: 1
Slave 1:Received Payload: 1
Master: Sent initial message
Master: Sent initial message
Master: Sent initial message
Slave 2:Going to Received Skeleton: 2
Slave 2:Received Skeleton: 2
Slave 2:Gpoing to Received Payload: 2
Slave 2:Received Payload: 2
Slave 3:Going to Received Skeleton: 3
Slave 3:Received Skeleton: 3
Slave 3:Gpoing to Received Payload: 3
Slave 3:Received Payload: 3
Slave 4:Going to Received Skeleton: 4
Slave 4:Received Skeleton: 4
Slave 4:Gpoing to Received Payload: 4
Slave 1: Sent Response Skeleton: 1
Master: Sent initial message
Slave 4:Received Payload: 4
Slave 5:Going to Received Skeleton: 5
terminate called after throwing an instance of
'boost
::exception_detail
::clone_impl
<boost::exception_detail::error_info_injector<boost::mpi::exception> >'
what(): MPI_Recv: MPI_ERR_TRUNCATE: message truncated
[rh5x64-u12:26987] *** Process received signal ***
[rh5x64-u12:26987] Signal: Aborted (6)
[rh5x64-u12:26987] Signal code: (-6)
[rh5x64-u12:26987] [ 0] /lib64/libpthread.so.0 [0x3ba680e7c0]
[rh5x64-u12:26987] [ 1] /lib64/libc.so.6(gsignal+0x35) [0x3ba5c30265]
[rh5x64-u12:26987] [ 2] /lib64/libc.so.6(abort+0x110) [0x3ba5c31d10]
[rh5x64-u12:26987] [ 3] /usr/lib64/libstdc++.so.
6(_ZN9__gnu_cxx27__verbose_terminate_handlerEv+0x114) [0x3bb7abec44]
[rh5x64-u12:26987] [ 4] /usr/lib64/libstdc++.so.6 [0x3bb7abcdb6]
[rh5x64-u12:26987] [ 5] /usr/lib64/libstdc++.so.6 [0x3bb7abcde3]
[rh5x64-u12:26987] [ 6] /usr/lib64/libstdc++.so.6 [0x3bb7abceca]
[rh5x64-u12:26987] [ 7] /userdata/testing/benchmark_binaries/bin/
boost_binomial_LB(_ZN5boost15throw_exceptionINS_3mpi9exceptionEEEvRKT_
+0x172) [0x4216a2]
[rh5x64-u12:26987] [ 8] /usr/local/lib/libboost_mpi.so.
1.42.0
(_ZN5boost3mpi6detail19packed_archive_recvEP19ompi_communicator_tiiRNS0_15packed_iarchiveER20ompi_status_public_t
+0x16b) [0x2b0317faa6b3]
[rh5x64-u12:26987] [ 9] /usr/local/lib/libboost_mpi.so.
1.42.0
(_ZNK5boost3mpi12communicator4recvINS0_15packed_iarchiveEEENS0_6statusEiiRT_
+0x40) [0x2b0317f9c72a]
[rh5x64-u12:26987] [10] /usr/local/lib/libboost_mpi.so.
1.42.0
(_ZNK5boost3mpi12communicator4recvINS0_24packed_skeleton_iarchiveEEENS0_6statusEiiRT_
+0x38) [0x2b0317f9c76c]
[rh5x64-u12:26987] [11] /userdata/testing/benchmark_binaries/bin/
boost_binomial_LB
(_ZNK5boost3mpi12communicator4recvI31Binomial_Option_Pricing_RequestEENS0_6statusEiiRKNS0_14skeleton_proxyIT_EE
+0x121) [0x4258c1]
[rh5x64-u12:26987] [12] /userdata/testing/benchmark_binaries/bin/
boost_binomial_LB(main+0x409) [0x41d369]
[rh5x64-u12:26987] [13] /lib64/libc.so.6(__libc_start_main+0xf4)
[0x3ba5c1d994]
[rh5x64-u12:26987] [14] /userdata/testing/benchmark_binaries/bin/
boost_binomial_LB(__gxx_personality_v0+0x399) [0x419e69]
[rh5x64-u12:26987] *** End of error message ***
[rh5x64-u11.zlab.local][[47840,1],0][btl_tcp_frag.c:
216:mca_btl_tcp_frag_recv] mca_btl_tcp_frag_recv: readv failed:
Connection reset by peer (104)
--------------------------------------------------------------------------
mpirun noticed that process rank 5 with PID 26987 on node 172.10.0.112
exited on signal 6 (Aborted).
--------------------------------------------------------------------------
Here is the program code:
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <algorithm>
#include <numeric>
#include <functional>
#include <iomanip>
#include <cstdlib>
#include <cmath>
#include <limits>
#include <vector>
#include <sstream>
#include <fstream>
#include <streambuf>
#include <mpi.h>
#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/config.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/string.hpp>
#include <boost/mpi/skeleton_and_content.hpp>
#include <boost/mpi/datatype.hpp>
#include <boost/archive/tmpdir.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/mpi.hpp>
#include <boost/tokenizer.hpp>
#include <boost/archive/tmpdir.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/export.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
#include "ace/OS_NS_sys_time.h"
#include "ace/OS_NS_time.h"
#include "ace/Profile_Timer.h"
using namespace MPI;
using std::scientific;
using namespace std;
namespace mpi = boost::mpi;
#define STOPTAG 0
std::ofstream output_file;
static void master (int & n_calls,
std::string &step_file_name,
std::string &byte_file_name,
mpi::communicator &world);
static void slave (mpi::communicator &world);
struct Binomial_Option_Pricing_Request
{
double cur_stock_price;
double strike_price;
double risk_free_rate;
double volatility;
double t;
int n_steps;
double option_price;
std::vector<char> payload;
};
namespace boost
{
namespace serialization
{
template<class Archive>
void serialize (Archive &ar,
struct
Binomial_Option_Pricing_Request &bopr,
unsigned int version)
{
ar & bopr.cur_stock_price;
ar & bopr.strike_price;
ar & bopr.risk_free_rate;
ar & bopr.volatility;
ar & bopr.t;
ar & bopr.n_steps;
ar & bopr.option_price;
ar & bopr.payload;
}
}
}
BOOST_IS_MPI_DATATYPE (Binomial_Option_Pricing_Request);
int
main (int argc, char **argv)
{
mpi::environment env (argc, argv);
mpi::communicator world;
std::string step_file_name;
int n_calls;
read_input (argv[1], &n_calls);
read_input (argv[2], &step_file_name);
std::string byte_file_name;
read_input (argv[3], &byte_file_name);
if (world.rank () == 0)
{
master (n_calls, step_file_name, byte_file_name, world);
}
else
{
slave (world);
}
MPI_Finalize ();
return 0;
}
static void
master (int & n_calls,
std::string &step_file_name,
std::string &byte_file_name,
mpi::communicator &world)
{
int n_tasks = world.size ();
int rank;
const double cur_stock_price = 110.0;
const double strike_price = 100.0;
const double risk_free_rate = 1.05; // Risk-free interest rate
const double volatility = 0.15; // Annualized volatility
const double t = 1.0; // In years
int request_count = 0;
int reply_count = 0;
int vector_count = 0;
std::vector<Binomial_Option_Pricing_Request> requests (n_calls);
std::vector<Binomial_Option_Pricing_Request> replies(n_calls);
for (std::vector<Binomial_Option_Pricing_Request>::iterator
it = requests.begin();
it != requests.end(); ++it)
{
Binomial_Option_Pricing_Request& r(*it);
r.cur_stock_price = cur_stock_price;
r.strike_price = strike_price;
r.risk_free_rate = risk_free_rate;
r.volatility = volatility;
r.t = t;
r.n_steps = step_vector[vector_count];
// resize vector for sending heterogenous payload
r.payload.resize (byte_vector[vector_count]);
// Initialize payload
std::for_each (r.payload.begin (),
r.payload.end (),
Initialize_Byte_Vector ());
++vector_count;
}
for (rank = 1; rank < n_tasks; ++rank)
{
// send Binomial_Option_Pricing_Request skeleton
// resize vector for sending heterogenous payload
world.send (rank,
request_count + 1,
mpi::skeleton(requests[request_count]));
// send Binomial_Option_Pricing_Request data
world.send (rank,
request_count + 1,
mpi::get_content(requests[request_count]));
std::cout << "Master: Sent initial message" << std::endl;
requests[request_count].payload.resize(0);
++request_count;
}
while (request_count < n_calls)
{
Binomial_Option_Pricing_Request bopr_reply_data;
mpi::status msg = world.probe ();
// Receive reply skeleton
world.recv (msg.source (), msg.tag (), mpi::skeleton
(bopr_reply_data));
std::cout << "Master: Received reply skeleton
from:"<<msg.source()<<"for"<<msg.tag ()<<std::endl;
// Receive reply
world.recv (msg.source (), msg.tag (), mpi::get_content
(bopr_reply_data));
std::cout << "Master: Received reply from:"<<msg.source()<<"for
"<<
msg.tag() << std::endl;
bopr_reply_data.payload.resize (0);
replies.push_back (bopr_reply_data);
++reply_count;
world.send (msg.source (),
request_count +
1,
mpi::skeleton(requests[request_count]));
std::cout << "Master:Sent message skeleton
to :"<<msg.source()<<std::endl;
// send Binomial_Option_Pricing_Request data
world.send (msg.source (),
request_count +
1,
mpi::get_content(requests[request_count]));
std::cout << "Master:Sent message to
:"<<msg.source()<<std::endl;
//requests[request_count].payload.resize(0);
++request_count;
// store reply
}
while (reply_count < n_calls)
{
std::cout <<" Master Inside final loop" <<std::endl;
Binomial_Option_Pricing_Request bopr_reply_data;
mpi::status msg = world.probe ();
// Receive reply skeleton
world.recv (msg.source (), msg.tag (), mpi::skeleton
(bopr_reply_data));
// Receive reply
world.recv (msg.source (), msg.tag (), mpi::get_content
(bopr_reply_data));
bopr_reply_data.payload.resize (0);
// store reply
replies.push_back (bopr_reply_data);
++reply_count;
}
for (int rank = 1; rank < n_tasks; ++rank)
{
Binomial_Option_Pricing_Request bopr_stop_data;
world.send (rank, STOPTAG, bopr_stop_data);
}
}
static void
slave (mpi::communicator &world)
{
int my_rank = world.rank ();
int count = 0;
while (1)
{
Binomial_Option_Pricing_Request bopr_call_data;
mpi::status msg = world.probe ();
if (msg.tag () == STOPTAG)
{
break;
}
else
{
world.recv (0, msg.tag (), mpi::skeleton
(bopr_call_data));
std::cout << "Slave " << world.rank () << ":Received Skeleton:
"<<msg.tag() << std::endl;
world.recv (0, msg.tag (), mpi::get_content (bopr_call_data));
std::cout << "Slave " << world.rank () << ":Received Payload:
"<<msg.tag() << std::endl;
bopr_call_data.option_price =
option_price_call_american_binomial
(bopr_call_data.cur_stock_price,
bopr_call_data.strike_price,
bopr_call_data.risk_free_rate,
bopr_call_data.volatility,
bopr_call_data.t,
bopr_call_data.n_steps);
world.isend (0, msg.tag (), mpi::skeleton
(bopr_call_data));
std::cout << "Slave " << world.rank () << ": Sent Response
Skeleton: "<<msg.tag() << std::endl;
world.isend (0, msg.tag (), mpi::get_content
(bopr_call_data));
std::cout << "Slave " << world.rank () << ": Sent Response Payload:
"<<msg.tag() << std::endl;
++count;
}
}
std::cout << "Slave: " << my_rank << " : "
<< "Number of requests processed: "
<< count << std::endl;
}
Thanks,
Pooja