Hello,
I'm running a MPI program which uses passive RMA to access shared arrays.
On some systems this program does not work as expected.
When working with several nodes, even though it produces the correct results,
only the process with rank 0 (the one with the shared arrays on its local
memory) is actually able to work on the shared arrays, which is an undesired
behavior.
This has happened with OpenMPI4, in particular with OpenMPI4.0.5 and
OpenMPI4.1.4.
However, when compiling and running using OpenMPI3 (in particular OpenMPI3.1.4)
the program works as expected and all processes work on the shared structures.
In addition, when compiling OpenMPI4 to use verbs instead of UCX, the program
will also works as expected.
Thus, we have concluded that there may be a problem regarding the use of UCX on
OpenMPI.
About the system where I am working on:
- Nodes on the system are connected through an InfiniBand FDR network.
- I'm running g++ (GCC) 8.3.0 and different versions of OpenMPI, as stated
previously.
I attach a sample code to help to reproduce the undesired behavior.
I also include the output of the test program (1) when behaving unpropertly and
(2) when behaving propertly.
Can someone help me understand if there's a problem with the program or with
OpenMPI and UCX?
Thanks a lot!
(1) Output behaving unpropertly:
+--------------------------------------------------+
Rank: 0 ||| Position: 0
Rank: 0 ||| Position: 10000
Rank: 0 ||| Position: 20000
...
Rank: 0 ||| Position: 850000
Rank: 0 ||| Position: 860000
Rank: 0 ||| Position: 870000
...
Rank: 0 ||| Position: 19970000
Rank: 0 ||| Position: 19980000
Rank: 0 ||| Position: 19990000
*****
*****
*****
*****
***** Small correctness check *****
Position 0
||| Input value: 0
||| Output value: 0.00
||| Expected output: 0.00
...
Position 19999999
||| Input value: 19999999
||| Output value: 49999997.50
||| Expected output: 49999997.50
*****
*****
*****
*****
***** Accesses per process data *****
Process 0 accesses: 2000
Process 1 accesses: 0
Process 2 accesses: 0
Process 3 accesses: 0
Process 4 accesses: 0
Process 5 accesses: 0
Process 6 accesses: 0
Process 7 accesses: 0
+--------------------------------------------------+
(2) Output behaving propertly:
+--------------------------------------------------+
Rank: 0 ||| Position: 70000
Rank: 0 ||| Position: 80000
Rank: 0 ||| Position: 90000
...
Rank: 3 ||| Position: 240000
Rank: 4 ||| Position: 280000
Rank: 7 ||| Position: 190000
...
Rank: 3 ||| Position: 19760000
Rank: 2 ||| Position: 19850000
Rank: 6 ||| Position: 19940000
*****
*****
*****
*****
***** Small correctness check *****
Position 0
||| Input value: 0
||| Output value: 0.00
||| Expected output: 0.00
...
Position 19999999
||| Input value: 19999999
||| Output value: 49999997.50
||| Expected output: 49999997.50
*****
*****
*****
*****
***** Accesses per process data *****
Process 0 accesses: 425
Process 1 accesses: 226
Process 2 accesses: 222
Process 3 accesses: 226
Process 4 accesses: 228
Process 5 accesses: 227
Process 6 accesses: 222
Process 7 accesses: 224
+--------------------------------------------------+
#include <iostream>
#include <numeric>
#include <stdexcept>
#include <iomanip>
using std::cout;
using std::endl;
// MPI added
#include <mpi.h>
#define MPI_RANK_0 0
#define MULT_FACTOR 2.5
static void
process_data(int *input_buffer,
double *output_buffer,
size_t BLOCK_SIZE){
for (size_t i = 0; i < BLOCK_SIZE; i++)
{
output_buffer[i] = (double) input_buffer[i] * MULT_FACTOR;
}
}
int
main(int argc, char **argv) {
int rank, number_of_processes;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &number_of_processes);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
const size_t VECTOR_SIZE = 20000000;
const size_t MY_SIZE = rank ? 0 : VECTOR_SIZE;
int * main_input_buffer;
double * main_output_buffer;
// Rank 0 has the input data
if (rank == MPI_RANK_0){
MPI_Alloc_mem(VECTOR_SIZE * sizeof(int), MPI_INFO_NULL, &main_input_buffer);
MPI_Alloc_mem(VECTOR_SIZE * sizeof(double), MPI_INFO_NULL, &main_output_buffer);
for (size_t i = 0; i < VECTOR_SIZE; i++){
main_input_buffer[i] = (int)i;
}
}
// We will create a shared index to access shared data on Rank 0
// Also, we will share input and output buffers on P0
size_t * main_buffer_index;
MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, &main_buffer_index);
*main_buffer_index = 0;
MPI_Barrier(MPI_COMM_WORLD);
MPI_Win index_window, input_window, output_window;
MPI_Win_create(main_buffer_index, 1 * sizeof(size_t), sizeof(size_t),
MPI_INFO_NULL, MPI_COMM_WORLD, &index_window);
MPI_Win_create(main_input_buffer,
MY_SIZE * sizeof(int),
sizeof(int),
MPI_INFO_NULL,
MPI_COMM_WORLD,
&input_window);
MPI_Win_create(main_output_buffer,
MY_SIZE * sizeof(double),
sizeof(double),
MPI_INFO_NULL,
MPI_COMM_WORLD,
&output_window);
// Store info about window access
int times_acessed = 0;
int times_accessed_per_process[number_of_processes];
//********** IDEA OF THE PROGRAM **********//
// Get an index from shared index window.
// Read input buffer using that index
// Process a block of data
// Write the results back
const size_t BLOCK_SIZE = 10000;
size_t * current_position;
MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, ¤t_position);
int * tmp_input_buffer;
double * tmp_output_buffer;
MPI_Alloc_mem(BLOCK_SIZE * sizeof(int), MPI_INFO_NULL, &tmp_input_buffer);
MPI_Alloc_mem(BLOCK_SIZE * sizeof(double), MPI_INFO_NULL, &tmp_output_buffer);
// Get initial index
// And increase index 10000 units for the next one to come
MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window);
MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT,
MPI_RANK_0, 0, MPI_SUM, index_window);
MPI_Win_unlock(MPI_RANK_0, index_window);
while (*current_position < VECTOR_SIZE){
// Get 10000 ints from Rank 0
MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0,
0, input_window);
MPI_Get(tmp_input_buffer, BLOCK_SIZE, MPI_INT,
MPI_RANK_0, *current_position, BLOCK_SIZE,
MPI_INT, input_window);
MPI_Win_unlock(MPI_RANK_0, input_window);
// Process data
process_data(tmp_input_buffer, tmp_output_buffer, BLOCK_SIZE);
// Write results to output buffer
MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, output_window);
MPI_Put(tmp_output_buffer, BLOCK_SIZE, MPI_DOUBLE,
MPI_RANK_0, *current_position, BLOCK_SIZE,
MPI_DOUBLE, output_window);
MPI_Win_unlock(MPI_RANK_0, output_window);
// Print data processed
cout << "Rank: " << rank << " ||| Position: " << *current_position << endl;
// Store your access
times_acessed += 1;
// Get next position
MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window);
MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT,
MPI_RANK_0, 0, MPI_SUM, index_window);
MPI_Win_unlock(MPI_RANK_0, index_window);
}
// When all has been processed, free windows
MPI_Barrier(MPI_COMM_WORLD);
MPI_Win_free(&index_window);
MPI_Win_free(&input_window);
MPI_Win_free(&output_window);
// Collect access information
MPI_Gather(×_acessed, 1, MPI_INT, times_accessed_per_process, 1, MPI_INT, MPI_RANK_0, MPI_COMM_WORLD);
// Print first and last positions for correctness
// Also print access data
if(rank == MPI_RANK_0){
cout << "*****" << endl;
cout << "*****" << endl;
cout << "*****" << endl;
cout << "*****" << endl;
cout << "***** Small correctness check *****" << endl;
cout << std::fixed;
cout << std::setprecision(2);
for(size_t i = 0; i < 10; i++)
cout << "Position " << i << endl << "\t||| Input value: " << main_input_buffer[i] << endl << "\t||| Output value: " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl;
cout << "..." << endl;
for(size_t i = VECTOR_SIZE-10; i < VECTOR_SIZE; i++)
cout << "Position " << i << endl << "\t||| Input value: " << main_input_buffer[i] << endl << "\t||| Output value: " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl;
cout << "*****" << endl;
cout << "*****" << endl;
cout << "*****" << endl;
cout << "*****" << endl;
cout << "***** Accesses per process data *****" << endl;
for(int i = 0; i < number_of_processes; i++)
cout << "Process " << i << " accesses: " << times_accessed_per_process[i] << endl;
}
// Free memory
MPI_Free_mem(main_buffer_index);
MPI_Free_mem(current_position);
MPI_Free_mem(tmp_input_buffer);
MPI_Free_mem(tmp_output_buffer);
if (rank == MPI_RANK_0){
MPI_Free_mem(main_input_buffer);
MPI_Free_mem(main_output_buffer);
}
MPI_Finalize();
return EXIT_SUCCESS;
}