Here's the program.
It should print something like that:
[1 communicating threads]
[0] 1 2.484936 0.402 0.384
[0] 2 2.478036 0.807 0.770
[0] 4 2.501503 1.599 1.525
[0] 8 2.497516 3.203 3.055
thread #1
[2 communicating threads]
[0] 1 3.970628 0.252 0.240
[1] 1 3.929280 0.254 0.243
[1] 2 4.087206 0.489 0.467
[0] 2 5.181758 0.386 0.368
[1] 4 3.715222 1.077 1.027
[0] 4 4.358013 0.918 0.875
[1] 8 4.166852 1.920 1.831
[0] 8 3.628287 2.205 2.103
thread #2
[3 communicating threads]
[0] 1 5.922241 0.169 0.161
[2] 1 6.896299 0.145 0.138
[1] 1 8.973834 0.111 0.106
...
Francois
George Bosilca wrote:
I will take a look at the BTL problem. Can you provide a copy of the
benchmarks please.
Thanks,
george.
On Jun 11, 2009, at 16:05 , François Trahay wrote:
concurrent_ping
_______________________________________________
users mailing list
us...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/users
/*
* NewMadeleine
* Copyright (C) 2006 (see AUTHORS file)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or (at
* your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "mpi.h"
#include <semaphore.h>
#include <sched.h>
/* This program performs several ping pong in parallel.
* This evaluates the efficienty to access nmad from 1, 2, 3, ...n threads simultanously
*/
#define LEN_DEFAULT 4
#define WARMUPS_DEFAULT 1000
#define LOOPS_DEFAULT 10000
#define THREADS_DEFAULT 16
#define DATA_CONTROL_ACTIVATED 0
static int comm_rank = -1;
static int comm_size = -1;
static char host_name[1024] = "";
static int max_len = 16;
static int loops;
static int threads;
static int warmups;
static sem_t ready_sem;
static int go;
static __inline__
uint32_t _next(uint32_t len, uint32_t multiplier, uint32_t increment)
{
if (!len)
return 1+increment;
else
return len*multiplier+increment;
}
void usage_ping() {
fprintf(stderr, "-L len - packet length [%d]\n", LEN_DEFAULT);
fprintf(stderr, "-N iterations - iterations [%d]\n", LOOPS_DEFAULT);
fprintf(stderr, "-T thread - number of communicating threads [%d]\n", THREADS_DEFAULT);
fprintf(stderr, "-W warmup - number of warmup iterations [%d]\n", WARMUPS_DEFAULT);
}
static void fill_buffer(char *buffer, int len) {
unsigned int i = 0;
for (i = 0; i < len; i++) {
buffer[i] = 'a'+(i%26);
}
}
static void clear_buffer(char *buffer, int len) {
memset(buffer, 0, len);
}
#if DATA_CONTROL_ACTIVATED
static void control_buffer(char *msg, char *buffer, int len) {
tbx_bool_t ok = tbx_true;
unsigned char expected_char;
unsigned int i = 0;
for(i = 0; i < len; i++){
expected_char = 'a'+(i%26);
if(buffer[i] != expected_char){
printf("Bad data at byte %d: expected %c, received %c\n",
i, expected_char, buffer[i]);
ok = tbx_false;
}
}
if (!ok) {
printf("Controling %s - ", msg);
printf("%d bytes reception failed\n", len);
TBX_FAILURE("data corruption");
} else {
printf("ok\n");
}
}
#endif
void
server(void* arg) {
int my_pos = (uint8_t)arg;
char *buf = NULL;
uint8_t tag = (uint8_t)arg;
int i, k;
int len;
buf = malloc(max_len);
clear_buffer(buf, max_len);
for(i = my_pos; i <= threads; i++) {
/* Be sure all the communicating threads have been created before we start */
while(go < i )
sched_yield();
for(len=1; len < max_len; len*=2){
for(k = 0; k < loops + warmups; k++) {
MPI_Request request;
MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
#if DATA_CONTROL_ACTIVATED
control_buffer("received", buf, len);
#endif
MPI_Send(buf, len , MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD);
}
}
sem_post(&ready_sem);
}
}
int
client(void *arg) {
int my_pos = (uint8_t)arg;
uint8_t tag = (uint8_t)my_pos;
char *buf = NULL;
double t1, t2;
double sum, lat, bw_million_byte, bw_mbyte;
int i, k;
int len;
fprintf(stderr, "thread #%d\n", my_pos);
buf = malloc(max_len);
clear_buffer(buf, max_len);
fill_buffer(buf, len);
for(i = my_pos; i <= threads; i++) {
/* Be sure all the communicating threads have been created before we start */
while(go < i )
sched_yield();
for(len=1; len < max_len; len*=2){
for(k = 0; k < warmups; k++) {
MPI_Request request;
#if DATA_CONTROL_ACTIVATED
control_buffer("sending", buf, len);
#endif
MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD);
MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
#if DATA_CONTROL_ACTIVATED
control_buffer("received", buf, len);
#endif
}
t1= MPI_Wtime();
for(k = 0; k < loops; k++) {
MPI_Request request;
#if DATA_CONTROL_ACTIVATED
control_buffer("sending", buf, len);
#endif
MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD);
MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
#if DATA_CONTROL_ACTIVATED
control_buffer("received", buf, len);
#endif
}
t2 = MPI_Wtime();
sum = (t2 - t1)*1e6;
lat = sum / (2 * loops);
bw_million_byte = len * (loops / (sum / 2));
bw_mbyte = bw_million_byte / 1.048576;
printf("[%d]\t%d\t%lf\t%8.3f\t%8.3f\n", my_pos, len, lat, bw_million_byte, bw_mbyte);
fflush(stdout);
}
sem_post(&ready_sem);
}
}
int
main(int argc,
char **argv) {
int i, j;
pthread_t * pid;
static sem_t bourrin_ready;
pthread_attr_t attr;
//len = LEN_DEFAULT;
loops = LOOPS_DEFAULT;
threads = THREADS_DEFAULT;
warmups = WARMUPS_DEFAULT;
int provided;
int needed = MPI_THREAD_MULTIPLE;
MPI_Init_thread(&argc, &argv, needed, &provided);
if(provided < needed){
fprintf(stderr, "needed: %d, provided: %d\n", needed, provided);
exit(0);
}
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
if (argc > 1 && !strcmp(argv[1], "--help")) {
usage_ping();
exit(0);
}
for(i=1 ; i<argc ; i+=2) {
if (!strcmp(argv[i], "-N")) {
loops = atoi(argv[i+1]);
}
else if (!strcmp(argv[i], "-L")) {
//len = atoi(argv[i+1]);
}
else if (!strcmp(argv[i], "-T")) {
threads = atoi(argv[i+1]);
}
else if (!strcmp(argv[i], "-W")) {
warmups = atoi(argv[i+1]);
}
else {
fprintf(stderr, "Illegal argument %s\n", argv[i]);
usage_ping();
exit(0);
}
}
pthread_attr_init(&attr);
pid = malloc(sizeof(pthread_t) * threads);
sem_init(&ready_sem, 0, 0);
go = 0;
for (i = 0 ; i< threads ; i++) {
printf("[%d communicating threads]\n", i+1);
if (comm_rank == 0) {
pthread_create(&pid[i], &attr, (void*)server, (uint8_t)i);
} else {
pthread_create(&pid[i], &attr, (void*)client, (uint8_t)i);
}
for( j = 0; j <= i; j++){
sem_wait(&ready_sem);
go=j;
}
go++;
}
for(i=0;i<threads;i++)
pthread_join(pid[i],NULL);
MPI_Finalize();
exit(0);
}