Hi All,

I have written a program where MPI master sends and receives large amount of data i.e sending from 1KB to 1MB of data.
The amount of data to be sent with each call is different

The program runs well when running with 5 slaves, but when i try to run the same program with 9 slaves, it gives me MPI_Recv:MPI_ERR_TRUNCATE: message truncated error.

I am using boost MPI and boost serialization libraries for sending data.
I understand that the internal buffer on the master are overrun in this case. Is there a way i can increase the buffer sizes ?

Here is the output:
-bash-3.2$ mpirun -np 9 --hostfile hostfile2 --rankfile rankfile2 $BENCHMARKS_ROOT/bin/boost_binomial_LB 10 5000_steps.txt 5000_homo_bytes.txt Master: Starting Binomial Option Price calculations for American call option
Master: Current stock price: 110
Master: Strike price: 100
Master: Risk-free rate: 1.05
Master: Volatility (annualized): 0.15
Master: Time (years): 1
Master: Number of calculations: 10

Slave 1:Going to Received Skeleton: 1
Slave 1:Received Skeleton: 1
Slave 1:Gpoing to Received Payload: 1
Slave 1:Received Payload: 1
Master: Sent initial message
Master: Sent initial message
Master: Sent initial message
Slave 2:Going to Received Skeleton: 2
Slave 2:Received Skeleton: 2
Slave 2:Gpoing to Received Payload: 2
Slave 2:Received Payload: 2
Slave 3:Going to Received Skeleton: 3
Slave 3:Received Skeleton: 3
Slave 3:Gpoing to Received Payload: 3
Slave 3:Received Payload: 3
Slave 4:Going to Received Skeleton: 4
Slave 4:Received Skeleton: 4
Slave 4:Gpoing to Received Payload: 4
Slave 1: Sent Response Skeleton: 1
Master: Sent initial message
Slave 4:Received Payload: 4
Slave 5:Going to Received Skeleton: 5
terminate called after throwing an instance of 'boost ::exception_detail ::clone_impl <boost::exception_detail::error_info_injector<boost::mpi::exception> >'
  what():  MPI_Recv: MPI_ERR_TRUNCATE: message truncated
[rh5x64-u12:26987] *** Process received signal ***
[rh5x64-u12:26987] Signal: Aborted (6)
[rh5x64-u12:26987] Signal code:  (-6)
[rh5x64-u12:26987] [ 0] /lib64/libpthread.so.0 [0x3ba680e7c0]
[rh5x64-u12:26987] [ 1] /lib64/libc.so.6(gsignal+0x35) [0x3ba5c30265]
[rh5x64-u12:26987] [ 2] /lib64/libc.so.6(abort+0x110) [0x3ba5c31d10]
[rh5x64-u12:26987] [ 3] /usr/lib64/libstdc++.so. 6(_ZN9__gnu_cxx27__verbose_terminate_handlerEv+0x114) [0x3bb7abec44]
[rh5x64-u12:26987] [ 4] /usr/lib64/libstdc++.so.6 [0x3bb7abcdb6]
[rh5x64-u12:26987] [ 5] /usr/lib64/libstdc++.so.6 [0x3bb7abcde3]
[rh5x64-u12:26987] [ 6] /usr/lib64/libstdc++.so.6 [0x3bb7abceca]
[rh5x64-u12:26987] [ 7] /userdata/testing/benchmark_binaries/bin/ boost_binomial_LB(_ZN5boost15throw_exceptionINS_3mpi9exceptionEEEvRKT_ +0x172) [0x4216a2] [rh5x64-u12:26987] [ 8] /usr/local/lib/libboost_mpi.so. 1.42.0 (_ZN5boost3mpi6detail19packed_archive_recvEP19ompi_communicator_tiiRNS0_15packed_iarchiveER20ompi_status_public_t +0x16b) [0x2b0317faa6b3] [rh5x64-u12:26987] [ 9] /usr/local/lib/libboost_mpi.so. 1.42.0 (_ZNK5boost3mpi12communicator4recvINS0_15packed_iarchiveEEENS0_6statusEiiRT_ +0x40) [0x2b0317f9c72a] [rh5x64-u12:26987] [10] /usr/local/lib/libboost_mpi.so. 1.42.0 (_ZNK5boost3mpi12communicator4recvINS0_24packed_skeleton_iarchiveEEENS0_6statusEiiRT_ +0x38) [0x2b0317f9c76c] [rh5x64-u12:26987] [11] /userdata/testing/benchmark_binaries/bin/ boost_binomial_LB (_ZNK5boost3mpi12communicator4recvI31Binomial_Option_Pricing_RequestEENS0_6statusEiiRKNS0_14skeleton_proxyIT_EE +0x121) [0x4258c1] [rh5x64-u12:26987] [12] /userdata/testing/benchmark_binaries/bin/ boost_binomial_LB(main+0x409) [0x41d369] [rh5x64-u12:26987] [13] /lib64/libc.so.6(__libc_start_main+0xf4) [0x3ba5c1d994] [rh5x64-u12:26987] [14] /userdata/testing/benchmark_binaries/bin/ boost_binomial_LB(__gxx_personality_v0+0x399) [0x419e69]
[rh5x64-u12:26987] *** End of error message ***
[rh5x64-u11.zlab.local][[47840,1],0][btl_tcp_frag.c: 216:mca_btl_tcp_frag_recv] mca_btl_tcp_frag_recv: readv failed: Connection reset by peer (104)
--------------------------------------------------------------------------
mpirun noticed that process rank 5 with PID 26987 on node 172.10.0.112 exited on signal 6 (Aborted).
--------------------------------------------------------------------------

Here is the program code:

#include <iostream>
#include <cstdlib>
#include <ctime>
#include <algorithm>
#include <numeric>
#include <functional>
#include <iomanip>
#include <cstdlib>
#include <cmath>
#include <limits>
#include <vector>
#include <sstream>
#include <fstream>
#include <streambuf>

#include <mpi.h>

#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/config.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/string.hpp>
#include <boost/mpi/skeleton_and_content.hpp>
#include <boost/mpi/datatype.hpp>
#include <boost/archive/tmpdir.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/mpi.hpp>
#include <boost/tokenizer.hpp>
#include <boost/archive/tmpdir.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/export.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>

#include "ace/OS_NS_sys_time.h"
#include "ace/OS_NS_time.h"
#include "ace/Profile_Timer.h"

using namespace MPI;
using std::scientific;
using namespace std;

namespace mpi = boost::mpi;

#define STOPTAG 0

std::ofstream output_file;

static void master (int & n_calls,
                                        std::string &step_file_name,
                                        std::string &byte_file_name,
                                        mpi::communicator &world);
static void slave (mpi::communicator &world);

struct Binomial_Option_Pricing_Request
{
        double cur_stock_price;
        double strike_price;
        double risk_free_rate;
        double volatility;
        double t;
        int n_steps;
        double option_price;
        std::vector<char> payload;
};

namespace boost
{
        namespace serialization
        {
                template<class Archive>
                        void serialize (Archive &ar,
                                                  struct 
Binomial_Option_Pricing_Request &bopr,
                                                  unsigned int version)
                        {
                                ar & bopr.cur_stock_price;
                                ar & bopr.strike_price;
                                ar & bopr.risk_free_rate;
                                ar & bopr.volatility;
                                ar & bopr.t;
                                ar & bopr.n_steps;
                                ar & bopr.option_price;
                                ar & bopr.payload;
                        }
        }
}

BOOST_IS_MPI_DATATYPE (Binomial_Option_Pricing_Request);

int
main (int argc, char **argv)
{
        mpi::environment env (argc, argv);
        mpi::communicator world;

        std::string step_file_name;
        int n_calls;

        read_input (argv[1], &n_calls);
        read_input (argv[2], &step_file_name);
        std::string byte_file_name;
        read_input (argv[3], &byte_file_name);

        if (world.rank () == 0)
        {
                master (n_calls, step_file_name, byte_file_name, world);
        }
        else
        {
                slave (world);
        }

        MPI_Finalize ();
        return 0;
}

static void
master (int & n_calls,
                                std::string &step_file_name,
                                std::string &byte_file_name,
                                mpi::communicator &world)
{
        int n_tasks = world.size ();
        int rank;

        const double cur_stock_price = 110.0;
        const double strike_price = 100.0;
        const double risk_free_rate = 1.05; // Risk-free interest rate
        const double volatility = 0.15; // Annualized volatility
        const double t = 1.0; // In years


        int request_count = 0;
  int reply_count = 0;
        int vector_count = 0;

        std::vector<Binomial_Option_Pricing_Request> requests (n_calls);
        std::vector<Binomial_Option_Pricing_Request> replies(n_calls);

        for (std::vector<Binomial_Option_Pricing_Request>::iterator
                         it = requests.begin();
                         it != requests.end(); ++it)
        {
                Binomial_Option_Pricing_Request& r(*it);
                r.cur_stock_price = cur_stock_price;
                r.strike_price = strike_price;
                r.risk_free_rate = risk_free_rate;
                r.volatility = volatility;
                r.t = t;
                r.n_steps = step_vector[vector_count];

                // resize vector for sending heterogenous payload
                r.payload.resize (byte_vector[vector_count]);
                // Initialize payload
                std::for_each (r.payload.begin (),
                                                                         
r.payload.end (),
                                                                         
Initialize_Byte_Vector ());
                ++vector_count;
        }

        for (rank = 1; rank < n_tasks; ++rank)
        {
                // send Binomial_Option_Pricing_Request skeleton
                // resize vector for sending heterogenous payload
                world.send (rank,
                                        request_count + 1,
                                        mpi::skeleton(requests[request_count]));

                // send Binomial_Option_Pricing_Request data
                world.send (rank,
                                        request_count + 1,
                                        
mpi::get_content(requests[request_count]));

                std::cout << "Master: Sent initial message" << std::endl;
                requests[request_count].payload.resize(0);
                ++request_count;
        }
        while (request_count < n_calls)
        {

                Binomial_Option_Pricing_Request bopr_reply_data;

                mpi::status msg = world.probe ();

                // Receive reply skeleton
world.recv (msg.source (), msg.tag (), mpi::skeleton (bopr_reply_data));

std::cout << "Master: Received reply skeleton from:"<<msg.source()<<"for"<<msg.tag ()<<std::endl;
                // Receive reply
world.recv (msg.source (), msg.tag (), mpi::get_content (bopr_reply_data));

                std::cout << "Master: Received reply from:"<<msg.source()<<"for 
"<<
                        msg.tag() << std::endl;
                bopr_reply_data.payload.resize (0);
                replies.push_back (bopr_reply_data);
                ++reply_count;

                world.send (msg.source (),
                                                                request_count + 
1,
                                                                
mpi::skeleton(requests[request_count]));

std::cout << "Master:Sent message skeleton to :"<<msg.source()<<std::endl;
                // send Binomial_Option_Pricing_Request data
                world.send (msg.source (),
                                                                request_count + 
1,
                                                                
mpi::get_content(requests[request_count]));

                std::cout << "Master:Sent message to 
:"<<msg.source()<<std::endl;
                //requests[request_count].payload.resize(0);
                ++request_count;
                // store reply
        }

        while (reply_count < n_calls)
        {
                std::cout <<" Master Inside final loop" <<std::endl;
                Binomial_Option_Pricing_Request bopr_reply_data;

                mpi::status msg = world.probe ();

                // Receive reply skeleton
world.recv (msg.source (), msg.tag (), mpi::skeleton (bopr_reply_data));
                // Receive reply
world.recv (msg.source (), msg.tag (), mpi::get_content (bopr_reply_data));
                bopr_reply_data.payload.resize (0);
                // store reply
                replies.push_back (bopr_reply_data);
                ++reply_count;
        }

        for (int rank = 1; rank < n_tasks; ++rank)
        {
                Binomial_Option_Pricing_Request bopr_stop_data;
                world.send (rank, STOPTAG, bopr_stop_data);
        }
}

static void
slave (mpi::communicator &world)
{
        int my_rank = world.rank ();
        int count = 0;

        while (1)
        {
                Binomial_Option_Pricing_Request bopr_call_data;

                mpi::status msg = world.probe ();
                if (msg.tag () == STOPTAG)
                {
                        break;
                }
                else
                {
                        world.recv (0, msg.tag (), mpi::skeleton 
(bopr_call_data));
std::cout << "Slave " << world.rank () << ":Received Skeleton: "<<msg.tag() << std::endl;

                world.recv (0, msg.tag (), mpi::get_content (bopr_call_data));
std::cout << "Slave " << world.rank () << ":Received Payload: "<<msg.tag() << std::endl;

bopr_call_data.option_price =
                                option_price_call_american_binomial 
(bopr_call_data.cur_stock_price,
                                                                                
                         bopr_call_data.strike_price,
                                                                                
                         bopr_call_data.risk_free_rate,
                                                                                
                         bopr_call_data.volatility,
                                                                                
                         bopr_call_data.t,
                                                                                
                         bopr_call_data.n_steps);

                        world.isend (0, msg.tag (), mpi::skeleton 
(bopr_call_data));
std::cout << "Slave " << world.rank () << ": Sent Response Skeleton: "<<msg.tag() << std::endl;
                        world.isend (0, msg.tag (), mpi::get_content 
(bopr_call_data));
std::cout << "Slave " << world.rank () << ": Sent Response Payload: "<<msg.tag() << std::endl;
                        ++count;
                }
        }
        std::cout << "Slave: " << my_rank << " : "
                                                << "Number of requests processed: " 
<< count << std::endl;
}

Thanks,
Pooja


Reply via email to