Hi,

there are some good news from the parallel front. I have replaced
the semaphores that control the fork() and join() off threads by a new,
busy-wait based, mechanism.

That has reduced the fork() and join() times considerably: from the 20,000 cycle ballpark to
less than 1000 cycles (the numbers still vary a lot due to caching). As a consequence, the
break-even point (where parallel is faster than sequential) for e.g. Z←A+B has dropped from
⍴,Z of some 1000s to some 50s.

I will now start to integrate the Parallel.cc prototype (see attachment) into GNU APL...

/// Jürgen




#include <fstream>
#include <iomanip> 
#include <iostream>
#include <vector>

#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

using namespace std;

// #define USE_SEMA

// #define WORK_LEN 100000
#define WORK_LEN 1

// #define VERBOSE tells you what is happening (and disables statistics)
//
// #define VERBOSE

#ifdef USE_SEMA

# define SL_INIT(sptr, val) sem_init(sptr, 0, val)
# define SL_POST(sptr) sem_post(sptr)
# define SL_WAIT(sptr) sem_wait(sptr)
# define SL_t    sem_t

#else

# define SL_INIT(sptr, val) *sptr = val;
# define SL_POST(sptr) ++*sptr;
# define SL_WAIT(sptr) while (*sptr <= 0)  ; --*sptr;
# define SL_t    volatile int

#endif

#define HAVE_affinity_np

//-----------------------------------------------------------------------------
#ifdef VERBOSE

# define INFO(th, message, mil)      \
   sem_wait(&print_sema);   \
   cout << "thread #" << th << " says: " << message; \
   if (mil != -1)   cout << " (mileage=" << mil << ")"; \
   cout << endl;  \
   sem_post(&print_sema);

#else

# define INFO(th, x, m)

#endif   // VERBOSE

/// an upper bound on the number of cores, so that we can
/// define some data structures statically
enum
{
   LOG_THREAD_LIMIT = 8,
   THREAD_LIMIT = 1 << LOG_THREAD_LIMIT
};

/// a semaphore to coordinate printouts from several threads
sem_t print_sema;

/// a semaphore to coordinate thread creration
sem_t pthread_create_sema;

//-----------------------------------------------------------------------------
inline uint64_t
cycle_counter()
{
unsigned int lo, hi;
   __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
   return ((uint64_t)hi << 32) | lo;
}
//-----------------------------------------------------------------------------
struct Thread_context
{
   /// constructor
   Thread_context()
   : thread_id(this - thread_contexts)
   {}

   /// start parallel execution of work
   void fork()
      {
        // wait until our master has moved forward
        //
        const int waiting_for_mileage = mileage + 1;
        while (thread_contexts[join_thread].mileage != waiting_for_mileage) /* busy wait */;
        INFO(thread_id, "forked", mileage)

        // fork our sub-threads
        //
        ++mileage;
        INFO(thread_id,
              forked_threads_count << " worker-thread(s) forked", mileage)
      }

   /// end parallel execution of work
   inline void join();

   /// the counter that controls forking and joining of threads
   volatile int mileage;

   // initialize all but thread and thread_id
   void init(int thread_count, const vector<int> & cores);

   void reset()
      { forked_threads_count = 0; }

   const int thread_id;
   int forked_threads[LOG_THREAD_LIMIT];
   int forked_threads_count;

   int join_thread;

   pthread_t thread;

   int CPU;   // the core to which thread binds

   static Thread_context thread_contexts[];

protected:
   void add_forked(int peer)
      { forked_threads[forked_threads_count++] = peer; }

   void add_joined(int peer)
      { join_thread = peer; }

} Thread_context::thread_contexts[THREAD_LIMIT];
// ----------------------------------------------------------------------------
void
Thread_context::join()
{
   // wait for the threads that we forked
   //
   for (int f = 0; f < forked_threads_count; ++f)
       {
        // wait until sub thread f has moved forward
         const int sub = forked_threads[f];
         const Thread_context & sub_ctx = thread_contexts[sub];
         const int waiting_for_mileage = mileage + 1;
         while (sub_ctx.mileage != waiting_for_mileage)   /* busy wait */ ;

         INFO(thread_id, "worker-thread #" << sub << " (" << (f + 1) << " of "
                         << forked_threads_count << ") has joined", mileage)
       }

   // inform our forker (if any) that we are done
   //
   if (thread_id)   // not thread #0
      {
        Thread_context & forker = Thread_context::thread_contexts[join_thread];
        ++mileage;
        INFO(thread_id, "joining thread #" << join_thread, mileage)
      }
}
//-----------------------------------------------------------------------------
void do_work(int id, int mileage);

void Thread_context::init(int thread_count, const vector<int> & cores)
{
   CPU = -1;
   if (thread_id >= thread_count)   return;
   CPU = cores[thread_id % cores.size()];

   for (int dist = THREAD_LIMIT >> 1; dist; dist >>= 1)
       {
         const int mask = dist - 1;
         if (thread_id & mask)   continue;

         const int peer = thread_id ^ dist;
         if (peer >= thread_count)   continue;
         if (thread_id & dist)          continue;

         // we fork peer and peer joins us.
         this->add_forked(peer);
         thread_contexts[peer].add_joined(thread_id);
       }

   mileage = 0;
   if (thread_id < thread_count)
      {
        cerr << "thread #" << thread_id << " will start ";
        if (forked_threads_count == 0)   cerr << "no threads";
        else if (forked_threads_count == 1)   cerr << "1 thread";
        else cerr << forked_threads_count << " threads";
        for (int c = 0; c < forked_threads_count; ++c)
            cerr << " #" << forked_threads[c];

        if (thread_id)
           {
             cerr << " and will join thread #" << join_thread;
           }
        cerr << endl;
      }
}
//-----------------------------------------------------------------------------

uint64_t work_start;              ///< cycle counter before all threads forked
int64_t  work_1[THREAD_LIMIT];    ///< cycle counters before individual work
int64_t  work_2[THREAD_LIMIT];    ///< cycle counters after  individual work
int      dummy[THREAD_LIMIT] = { 0 };
uint64_t work_end;                ///< cycle counter after all threads joined

void
do_work(int thread_id, int mileage)
{
INFO(thread_id, "start work", mileage)
   work_1[thread_id] = cycle_counter();

   for (int w = 0; w < WORK_LEN; ++w)
       {
         dummy[thread_id] = dummy[thread_id] * 333333 % 444444;
       }

   work_2[thread_id] = cycle_counter();
INFO(thread_id, "done work", mileage)
}
//-----------------------------------------------------------------------------
void *
pthread_main(void * arg)
{
Thread_context & ctx = *(Thread_context *)arg;

   INFO(ctx.thread_id, "thread " << ctx.thread_id << " created", ctx.mileage);
   sem_post(&pthread_create_sema);

   for (;;)
      {
        ctx.fork();
        do_work(ctx.thread_id, ctx.mileage);
        ctx.join();
      }

   // not reached
   //
   return 0;
}
//-----------------------------------------------------------------------------
int
setup_threads(int thread_count, const vector<int> & cores)
{
   // limit thread_count by THREAD_LIMIT
   //
   if (thread_count > THREAD_LIMIT)   thread_count = THREAD_LIMIT;

   // clear and initizlize all Thread_contexts
   //
   cerr << "\nInitializing thread contexts (thread_count = "
        << thread_count << ")..." << endl;
   for (int c = 0; c < THREAD_LIMIT; ++c)
       Thread_context::thread_contexts[c].reset();

   for (int c = 0; c < THREAD_LIMIT; ++c)
       Thread_context::thread_contexts[c].init(thread_count, cores);

   // the main thread is #0 and we create worker-threads for #1 #2 ...
   //
   cerr << "\nCreating worker threads ..." << endl;
   Thread_context::thread_contexts[0].thread = pthread_self();
   for (int c = 1; c < thread_count; ++c)
       {
         Thread_context * ctx = Thread_context::thread_contexts + c;
         pthread_create(&(ctx->thread), /* attr */ 0, pthread_main, ctx);

         // wait until new thread has reached its loop
         sem_wait(&pthread_create_sema);
       }

#ifdef HAVE_affinity_np

   // bind threads to cores
   //
   cerr << "\nBinding threads to cores..." << endl;
   for (int c = 0; c < thread_count; ++c)
       {
         Thread_context * ctx = Thread_context::thread_contexts + c;

         cpu_set_t cpus;
         CPU_ZERO(&cpus);
         CPU_SET(ctx->CPU, &cpus);
         
         const int err = pthread_setaffinity_np(ctx->thread,
                                                sizeof(cpu_set_t), &cpus);
         if (err)
            {
              cerr << "pthread_setaffinity_np() failed with error "
                   << err << endl;
              exit(3);
            }

         cerr << "bound thread #" << c << " to core " << ctx->CPU << endl;
       }
#endif // HAVE_affinity_np

   return thread_count;
}
//-----------------------------------------------------------------------------

#ifdef HAVE_affinity_np

/// compute the number of cores avaiable
static int
setup_cores(vector<int> & cores, cpu_set_t & CPUs)
{
const int err = pthread_getaffinity_np(pthread_self(),
                                       sizeof(cpu_set_t), &CPUs);
   if (err)
      {
        cerr << "pthread_getaffinity_np() failed with error "
             << err << endl;
        exit(2);
      }

   // get available CPUs (cores) but at most THREAD_LIMIT
   //
   for (int c = 0; c < THREAD_LIMIT; ++c)
       {
         if (CPU_ISSET(c, &CPUs))   cores.push_back(c);
       }

   cout << "\nThis machine has " << cores.size() << " cores:";
   for (int c = 0; c < cores.size(); ++c)   cout << " " << cores[c];
   cout << endl;

   return cores.size();
}
#else

#endif // HAVE_affinity_np

//-----------------------------------------------------------------------------
void
print_times(ostream & out, int thread_count, int pass)
{
   // print execution statistics, unless VERBOSE is on (in which case that
   // makes no sense.
   //
#ifdef VERBOSE
        out  << endl << "No statistics because VERBOSE was #defined" << endl;
        return;
#endif

   out << " " << thread_count << " cores/threads "
       << (work_end - work_start) << " cycles total" << endl;

char filename[1000];
   sprintf(filename, "c%d_p%d.data", thread_count, pass);

   // figure when the first thread was finished. That is the time from
   // where we measure the join time
   //
int64_t work_2_min = work_2[0];
   for (int c = 1; c < thread_count; ++c)
       {
         if (work_2_min > work_2[c])    (work_2_min = work_2[c]);
       }

ofstream plot (filename);
   for (int c = 0; c < thread_count; ++c)
       {
         plot << "    " << setw(3) << c
              << ", " << (work_1[c] - work_start)
              << ", " << (work_2[c] - work_2_min) << endl;
       }
}
//-----------------------------------------------------------------------------
int
main(int argc, const char * argv[])
{
   if (argc == 3 && !strcmp(argv[1], "--seq2"))
      {
        // print 2 3 4 .... argv[2] (for portable makefile loop)
        //
        printf("2");
        for (int j = 3; j <= atoi(argv[2]); ++j)   printf(" %d", j);
        printf("\n");
        return 0;
      }

int thread_count = 10;   if (argc >= 2)   thread_count = atoi(argv[1]);

vector<int> cores;

#ifdef HAVE_affinity_np
   // determine the available cores and remember them
   //
cpu_set_t CPUs;
   setup_cores(cores, CPUs);
   if (cores.size() > thread_count)   cores.resize(thread_count);

#else

   for (int t = 0; t < thread_count; ++t)   cores.push_back(t);
#endif // HAVE_affinity_np


   // setup for parallel execution. This is normally done only once (unless
   // the number of cores/threads is changed.
   //
   sem_init(&print_sema, 0, 1);
   sem_init(&pthread_create_sema, 0, 0);

   thread_count = setup_threads(thread_count, cores);

   cerr << endl;

   // we run the loop several times so that we can see cache effects
   //
   for (int pass = 0; pass < 5; ++pass)
       {
         cout << "Pass " << pass
              << " --------------------------------------------" << endl;

         work_start = cycle_counter();

         {
           // do the same as the workers, but without calling pthread_main().
           // this is to avoid unneccessary tests for worker vs. master in
           // pthread_main().
           //
           Thread_context & master = Thread_context::thread_contexts[0];
           ++master.mileage;
           do_work(master.thread_id, master.mileage);
           master.join();
           ++master.mileage;
         } 

         work_end = cycle_counter();

#ifndef VERBOSE
         print_times(cout, thread_count, pass);
#endif
       }
}
//-----------------------------------------------------------------------------
#!/usr/bin/gnuplot

set term xterm

plot   'c4_p1.data' using 1:2 with lines lw 3 lc 1, \
       'c4_p2.data' using 1:2 with lines lw 3 lc 2, \
       'c4_p3.data' using 1:2 with lines lw 3 lc 3, \
       'c4_p4.data' using 1:2 with lines lw 3 lc 4
pause -1

plot   'c4_p1.data' using 1:3 with lines lw 3 lc 5, \
       'c4_p2.data' using 1:3 with lines lw 3 lc 6, \
       'c4_p3.data' using 1:3 with lines lw 3 lc 7, \
       'c4_p4.data' using 1:3 with lines lw 3 lc 8
pause -1

all: Parallel

Parallel:   Parallel.cc
        g++ -O2 -o $@ $< -lpthread

2 4 8 80:       all
        echo Running benchmarks for 1 ... $@ cores...
#       for cnt in `./Parallel --seq2 $@` ; do \
#           ./Parallel $$cnt ; done
        ./Parallel $@ ; done
        tar czf result_$@.tgz *.data
#       rm -f cores_*

clean:
        rm -f Parallel *.data result_*.tgz

Reply via email to