"Tom Lane" <[EMAIL PROTECTED]> writes:

> Gregory Stark <[EMAIL PROTECTED]> writes:
>> "Tom Lane" <[EMAIL PROTECTED]> writes:
>>> Seems to me this proves nothing much, since it doesn't use the same SysV
>>> semaphore API PG does.
>
>> I was trying to copy the semaphore API exactly assuming
>> USE_NAMED_POSIX_SEMAPHORES was *not* defined. According to the comments we
>> prefer not to use named semaphores if possible.
>
> What you seem to have copied is the posix_sema.c code, which AFAIK is
> only used on Darwin.  sysv_sema.c is what to look at ... unless your
> benchmark machine is a Mac.

I switched the code over to the sysv_sema style api. It's gotten a bit grotty
and I would clean it up if it weren't a temporary test program. If we find a
real problem perhaps I should add a test case like this to the "smoke test" in
ipc_test.c so people can check their OS. 

I did add something like the setitimer deadlock timeout to detect a process
stuck waiting. There is a race condition there if a process is woken up just
as the timer fires but if the timeout is large enough the chances of that are
pretty remote. Judging by the first thread the whole loop excluding the usleep
takes about 3ms. I've been using a timeout of 10 seconds. As such:

$ ./a.out 40 900 10
running with 40 processes for 900s with timeout of 10000ms
telling threads to exit
run done
cleaning up semaphores and shared memory

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/time.h>

union semun {
  int              val;    /* Value for SETVAL */
  struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
  unsigned short  *array;  /* Array for GETALL, SETALL */
  struct seminfo  *__buf;  /* Buffer for IPC_INFO (Linux specific) */
};

#define SEMAS_PER_SET	16
#define IPCProtection	(0600)	/* access/modify by user only */
#define PGSemaMagic		537		/* must be less than SEMVMX */
#define SEMAS_KEY_START	(5431*1000-1)

int nthreads, timeout, shmid;
volatile unsigned char *wakers;
typedef struct PGSemaphoreData
{
	int			semId;			/* semaphore set identifier */
	int			semNum;			/* semaphore number within set */
} PGSemaphoreData;
PGSemaphoreData *sems;

static void atexit_handler();
static void worker(int n);

static void down(int n);
static void up(int n);

#define MAX_THREADS 250
#define WAKER_NOOP 253
#define WAKER_EXIT 254
#define WAKER_RUNNABLE 255

/* this just forces the atexit handler to be called */
static void handle_sig(int arg) {exit(127+arg);}

int main(int argc, char *argv[])
{
  int i, semKey, runtime;
  pid_t *pids;
  struct sigaction act, oact;
  int semId=-1;

  if (argc <= 1)
    nthreads = 10;
  else 
    nthreads = atoi(argv[1]);

  if (nthreads <= 0 || nthreads > MAX_THREADS) {
    fprintf(stderr, "usage: nthreads not between 1 and %d\n", MAX_THREADS);
    exit(1);
  }

  if (argc <= 2)
    runtime = 10;
  else
    runtime = atoi(argv[2]);

  if (runtime < 1) {
    fprintf(stderr, "usage: runtime shorter than 1s\n");
    exit(1);
  }

  if (argc <= 3)
    timeout = 1000*60;
  else
    timeout = 1000.0*atof(argv[3]);

  if (timeout < 1) {
    fprintf(stderr, "usage: timeout shorter than 1s\n");
    exit(1);
  }
  
  printf("running with %d processes for %ds with timeout of %dms\n", nthreads, runtime, timeout);
  sems = malloc(sizeof(*sems)*nthreads);

  semKey = SEMAS_KEY_START;
  for (i=0;i<nthreads;i++) {
    union semun semun;
    int semNum = i % SEMAS_PER_SET;

    if (semNum == 0) {
      semKey += 1;
      semId = semget(semKey, SEMAS_PER_SET, IPC_CREAT | IPC_EXCL | IPCProtection);
      if (semId < 0) {
	perror("semget");
	exit(1);
      }
    }

    semun.val = 0;
    if (semctl(semId, semNum, SETVAL, semun) < 0) {
      fprintf(stderr, "semctl(%d, %d, SETVAL, 0): %s\n", semId, semNum, strerror(errno));
      exit(1);
    }

    if (semId<0 || semNum > SEMAS_PER_SET)
      exit(1);

    sems[i].semId = semId;
    sems[i].semNum= semNum;
  }
  
  shmid = shmget(IPC_PRIVATE, nthreads*sizeof(unsigned char), IPC_CREAT | IPC_EXCL | IPCProtection);

  if (shmid == -1) {
    perror("shmget");
    exit(1);
  }


  wakers = shmat(shmid, NULL, 0);
  wakers[0] = WAKER_NOOP;
  for (i=1;i<nthreads;i++)
    wakers[i] = WAKER_RUNNABLE;

  pids = malloc(sizeof(pid_t)*nthreads);

  for (i=0;i<nthreads;i++) {
    /*printf("forking thread %d\n", i);*/
    switch(pids[i] = fork()) {
    case 0:
      worker(i);
      exit(0);
    case -1:
      perror("fork");
      exit(1);
    default:
      /*printf("successfully forked thread %d as pid %d\n", i, pids[i]);*/
      break;
    }
  }

  act.sa_handler = handle_sig;
  sigemptyset(&act.sa_mask);
  act.sa_flags = 0;
  if (sigaction(SIGINT, &act, &oact) < 0)
    perror("sigaction");
  atexit(atexit_handler);

  sleep(runtime);
  printf("telling threads to exit\n");

  for (i=0;i<nthreads;i++) {
    while (wakers[i] == WAKER_RUNNABLE) {
      printf("still waiting for thread %d to block\n", i);
      sleep(1);
    }

    /*printf("telling thread %d to exit\n", i);*/
    wakers[i] = WAKER_EXIT;
    up(i);
    usleep(20000);
    
    while(wakers[i] == WAKER_EXIT) {
      printf("still waiting for thread %d to exit\n", i);
      sleep(1);
    }
  }

  printf("run done\n");
  
  exit(0);
}

void static atexit_handler()
{
  int i;

  printf("cleaning up semaphores and shared memory\n");

  for (i=1;i<nthreads;i++)
    wakers[i] = WAKER_EXIT;

  if (shmctl(shmid, IPC_RMID, NULL) < 0)
    perror("shmctl ipc_rmid");

  for(i=0; i<nthreads; i += SEMAS_PER_SET) {
    union semun semun;
    semun.val = 0;
    if (semctl(sems[i].semId, 0, IPC_RMID, semun) < 0)
      fprintf(stderr, "semctl(sems[%d].semId==%d, 0, IPCS_RMID, {0}: %s\n", i, sems[i].semId, strerror(errno));
  }
}



int MyThread;
volatile int sigalarm_fired, sigalarm_found_myself_runnable;

static void
handle_sig_alarm(int arg)
{
  int waker = wakers[MyThread];

  sigalarm_fired = 1;

  if (waker == WAKER_RUNNABLE)
    sigalarm_found_myself_runnable = 1;
}

static void worker(int n)
{
  long niterations=0;
  struct itimerval timeval;
  struct sigaction act, oact;

  srandom(getpid());
  MyThread = n;

  act.sa_handler = handle_sig_alarm;
  sigemptyset(&act.sa_mask);
  act.sa_flags = 0;
  if (sigaction(SIGALRM, &act, &oact) < 0)
    perror("sigaction");

  for(;;) {
    int waker;
    int i;

    /* wake anyone following us waiting for us to wake them */
    for (i=n+1;i<nthreads;i++) {
      if (wakers[i] == n) {
	/*printf("thread %d waking thread %d\n", n, i);*/
	wakers[i] = WAKER_RUNNABLE;
	up(i);
      }
    }
    
    niterations++;
    if (wakers[n] == WAKER_EXIT) {
      /*printf("thread %d exiting after %ld iterations\n", n, niterations);*/
      wakers[n] = WAKER_NOOP;
      exit(0);
    }
  
    if (n == 0) {
      /* we're the first thread so we just sleep and then go around waking
	 people again */
      usleep(10000);
      continue;
    }

    /* otherwise pick a random thread earlier than us to wake us and go to
       sleep until awoken by it */

    sigalarm_fired = 0;
    sigalarm_found_myself_runnable = 0;
    memset(&timeval, 0, sizeof(struct itimerval));
    timeval.it_value.tv_sec  = timeout / 1000;
    timeval.it_value.tv_usec = (timeout % 1000) * 1000;
    if (setitimer(ITIMER_REAL, &timeval, NULL)) {
      perror("setitimer");
      exit(1);
    }
    waker = random()%n;
    /*printf("thread %d sleeping waiting for %d to wake us\n", n, waker);*/
    wakers[n] = waker;
    down(n);

    if ((waker = wakers[n]) <= MAX_THREADS) {
      printf("thread %d awake but waker is still set to %d !!!!\n", n, waker);
      exit(1);
    }

    memset(&timeval, 0, sizeof(struct itimerval));
    if (setitimer(ITIMER_REAL, &timeval, NULL)) {
      perror("setitimer");
      exit(1);
    }

    /*
    if (sigalarm_fired)
      printf("timer fired\n");
    */
    if (sigalarm_found_myself_runnable)
      printf("thread %d lost a wakeup!!!\n", n);

    /*printf("thread %d awoken\n", n);*/
  }
}

static void mysemop(int n, int op)
{
  struct sembuf sops;
  int errstatus;

  sops.sem_op = op;
  sops.sem_flg = 0;
  sops.sem_num = sems[n].semNum;
  do {
    errstatus = semop(sems[n].semId, &sops, 1);
  } while (errstatus < 0 && errno == EINTR);

  if (errstatus < 0) {
    fprintf(stderr, "semop(%d, {%d,0,%d}, 1): %s\n", sems[n].semId, op, sems[n].semNum, strerror(errno));
    exit(1);
  }
}

static void up(int n)
{
  return mysemop(n,1);
}

static void down(int n)
{
  return mysemop(n,-1);
}




-- 
  Gregory Stark
  EnterpriseDB          http://www.enterprisedb.com
---------------------------(end of broadcast)---------------------------
TIP 1: if posting/reading through Usenet, please send an appropriate
       subscribe-nomail command to [EMAIL PROTECTED] so that your
       message can get through to the mailing list cleanly

Reply via email to