Hi, there are some good news from the parallel front. I have replaced the semaphores that control the fork() and join() off threads by a new, busy-wait based, mechanism. That has reduced the fork() and join() times considerably: from the 20,000 cycle ballpark to less than 1000 cycles (the numbers still vary a lot due to caching). As a consequence, the break-even point (where parallel is faster than sequential) for e.g. Z←A+B has dropped from ⍴,Z of some 1000s to some 50s. I will now start to integrate the Parallel.cc prototype (see attachment) into GNU APL... /// Jürgen |
#include <fstream> #include <iomanip> #include <iostream> #include <vector>
#include <assert.h> #include <pthread.h> #include <semaphore.h> #include <stdint.h> #include <stdlib.h> #include <string.h> using namespace std; // #define USE_SEMA // #define WORK_LEN 100000 #define WORK_LEN 1 // #define VERBOSE tells you what is happening (and disables statistics) // // #define VERBOSE #ifdef USE_SEMA # define SL_INIT(sptr, val) sem_init(sptr, 0, val) # define SL_POST(sptr) sem_post(sptr) # define SL_WAIT(sptr) sem_wait(sptr) # define SL_t sem_t #else # define SL_INIT(sptr, val) *sptr = val; # define SL_POST(sptr) ++*sptr; # define SL_WAIT(sptr) while (*sptr <= 0) ; --*sptr; # define SL_t volatile int #endif #define HAVE_affinity_np //----------------------------------------------------------------------------- #ifdef VERBOSE # define INFO(th, message, mil) \ sem_wait(&print_sema); \ cout << "thread #" << th << " says: " << message; \ if (mil != -1) cout << " (mileage=" << mil << ")"; \ cout << endl; \ sem_post(&print_sema); #else # define INFO(th, x, m) #endif // VERBOSE /// an upper bound on the number of cores, so that we can /// define some data structures statically enum { LOG_THREAD_LIMIT = 8, THREAD_LIMIT = 1 << LOG_THREAD_LIMIT }; /// a semaphore to coordinate printouts from several threads sem_t print_sema; /// a semaphore to coordinate thread creration sem_t pthread_create_sema; //----------------------------------------------------------------------------- inline uint64_t cycle_counter() { unsigned int lo, hi; __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi)); return ((uint64_t)hi << 32) | lo; } //----------------------------------------------------------------------------- struct Thread_context { /// constructor Thread_context() : thread_id(this - thread_contexts) {} /// start parallel execution of work void fork() { // wait until our master has moved forward // const int waiting_for_mileage = mileage + 1; while (thread_contexts[join_thread].mileage != waiting_for_mileage) /* busy wait */; INFO(thread_id, "forked", mileage) // fork our sub-threads // ++mileage; INFO(thread_id, forked_threads_count << " worker-thread(s) forked", mileage) } /// end parallel execution of work inline void join(); /// the counter that controls forking and joining of threads volatile int mileage; // initialize all but thread and thread_id void init(int thread_count, const vector<int> & cores); void reset() { forked_threads_count = 0; } const int thread_id; int forked_threads[LOG_THREAD_LIMIT]; int forked_threads_count; int join_thread; pthread_t thread; int CPU; // the core to which thread binds static Thread_context thread_contexts[]; protected: void add_forked(int peer) { forked_threads[forked_threads_count++] = peer; } void add_joined(int peer) { join_thread = peer; } } Thread_context::thread_contexts[THREAD_LIMIT]; // ---------------------------------------------------------------------------- void Thread_context::join() { // wait for the threads that we forked // for (int f = 0; f < forked_threads_count; ++f) { // wait until sub thread f has moved forward const int sub = forked_threads[f]; const Thread_context & sub_ctx = thread_contexts[sub]; const int waiting_for_mileage = mileage + 1; while (sub_ctx.mileage != waiting_for_mileage) /* busy wait */ ; INFO(thread_id, "worker-thread #" << sub << " (" << (f + 1) << " of " << forked_threads_count << ") has joined", mileage) } // inform our forker (if any) that we are done // if (thread_id) // not thread #0 { Thread_context & forker = Thread_context::thread_contexts[join_thread]; ++mileage; INFO(thread_id, "joining thread #" << join_thread, mileage) } } //----------------------------------------------------------------------------- void do_work(int id, int mileage); void Thread_context::init(int thread_count, const vector<int> & cores) { CPU = -1; if (thread_id >= thread_count) return; CPU = cores[thread_id % cores.size()]; for (int dist = THREAD_LIMIT >> 1; dist; dist >>= 1) { const int mask = dist - 1; if (thread_id & mask) continue; const int peer = thread_id ^ dist; if (peer >= thread_count) continue; if (thread_id & dist) continue; // we fork peer and peer joins us. this->add_forked(peer); thread_contexts[peer].add_joined(thread_id); } mileage = 0; if (thread_id < thread_count) { cerr << "thread #" << thread_id << " will start "; if (forked_threads_count == 0) cerr << "no threads"; else if (forked_threads_count == 1) cerr << "1 thread"; else cerr << forked_threads_count << " threads"; for (int c = 0; c < forked_threads_count; ++c) cerr << " #" << forked_threads[c]; if (thread_id) { cerr << " and will join thread #" << join_thread; } cerr << endl; } } //----------------------------------------------------------------------------- uint64_t work_start; ///< cycle counter before all threads forked int64_t work_1[THREAD_LIMIT]; ///< cycle counters before individual work int64_t work_2[THREAD_LIMIT]; ///< cycle counters after individual work int dummy[THREAD_LIMIT] = { 0 }; uint64_t work_end; ///< cycle counter after all threads joined void do_work(int thread_id, int mileage) { INFO(thread_id, "start work", mileage) work_1[thread_id] = cycle_counter(); for (int w = 0; w < WORK_LEN; ++w) { dummy[thread_id] = dummy[thread_id] * 333333 % 444444; } work_2[thread_id] = cycle_counter(); INFO(thread_id, "done work", mileage) } //----------------------------------------------------------------------------- void * pthread_main(void * arg) { Thread_context & ctx = *(Thread_context *)arg; INFO(ctx.thread_id, "thread " << ctx.thread_id << " created", ctx.mileage); sem_post(&pthread_create_sema); for (;;) { ctx.fork(); do_work(ctx.thread_id, ctx.mileage); ctx.join(); } // not reached // return 0; } //----------------------------------------------------------------------------- int setup_threads(int thread_count, const vector<int> & cores) { // limit thread_count by THREAD_LIMIT // if (thread_count > THREAD_LIMIT) thread_count = THREAD_LIMIT; // clear and initizlize all Thread_contexts // cerr << "\nInitializing thread contexts (thread_count = " << thread_count << ")..." << endl; for (int c = 0; c < THREAD_LIMIT; ++c) Thread_context::thread_contexts[c].reset(); for (int c = 0; c < THREAD_LIMIT; ++c) Thread_context::thread_contexts[c].init(thread_count, cores); // the main thread is #0 and we create worker-threads for #1 #2 ... // cerr << "\nCreating worker threads ..." << endl; Thread_context::thread_contexts[0].thread = pthread_self(); for (int c = 1; c < thread_count; ++c) { Thread_context * ctx = Thread_context::thread_contexts + c; pthread_create(&(ctx->thread), /* attr */ 0, pthread_main, ctx); // wait until new thread has reached its loop sem_wait(&pthread_create_sema); } #ifdef HAVE_affinity_np // bind threads to cores // cerr << "\nBinding threads to cores..." << endl; for (int c = 0; c < thread_count; ++c) { Thread_context * ctx = Thread_context::thread_contexts + c; cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(ctx->CPU, &cpus); const int err = pthread_setaffinity_np(ctx->thread, sizeof(cpu_set_t), &cpus); if (err) { cerr << "pthread_setaffinity_np() failed with error " << err << endl; exit(3); } cerr << "bound thread #" << c << " to core " << ctx->CPU << endl; } #endif // HAVE_affinity_np return thread_count; } //----------------------------------------------------------------------------- #ifdef HAVE_affinity_np /// compute the number of cores avaiable static int setup_cores(vector<int> & cores, cpu_set_t & CPUs) { const int err = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &CPUs); if (err) { cerr << "pthread_getaffinity_np() failed with error " << err << endl; exit(2); } // get available CPUs (cores) but at most THREAD_LIMIT // for (int c = 0; c < THREAD_LIMIT; ++c) { if (CPU_ISSET(c, &CPUs)) cores.push_back(c); } cout << "\nThis machine has " << cores.size() << " cores:"; for (int c = 0; c < cores.size(); ++c) cout << " " << cores[c]; cout << endl; return cores.size(); } #else #endif // HAVE_affinity_np //----------------------------------------------------------------------------- void print_times(ostream & out, int thread_count, int pass) { // print execution statistics, unless VERBOSE is on (in which case that // makes no sense. // #ifdef VERBOSE out << endl << "No statistics because VERBOSE was #defined" << endl; return; #endif out << " " << thread_count << " cores/threads " << (work_end - work_start) << " cycles total" << endl; char filename[1000]; sprintf(filename, "c%d_p%d.data", thread_count, pass); // figure when the first thread was finished. That is the time from // where we measure the join time // int64_t work_2_min = work_2[0]; for (int c = 1; c < thread_count; ++c) { if (work_2_min > work_2[c]) (work_2_min = work_2[c]); } ofstream plot (filename); for (int c = 0; c < thread_count; ++c) { plot << " " << setw(3) << c << ", " << (work_1[c] - work_start) << ", " << (work_2[c] - work_2_min) << endl; } } //----------------------------------------------------------------------------- int main(int argc, const char * argv[]) { if (argc == 3 && !strcmp(argv[1], "--seq2")) { // print 2 3 4 .... argv[2] (for portable makefile loop) // printf("2"); for (int j = 3; j <= atoi(argv[2]); ++j) printf(" %d", j); printf("\n"); return 0; } int thread_count = 10; if (argc >= 2) thread_count = atoi(argv[1]); vector<int> cores; #ifdef HAVE_affinity_np // determine the available cores and remember them // cpu_set_t CPUs; setup_cores(cores, CPUs); if (cores.size() > thread_count) cores.resize(thread_count); #else for (int t = 0; t < thread_count; ++t) cores.push_back(t); #endif // HAVE_affinity_np // setup for parallel execution. This is normally done only once (unless // the number of cores/threads is changed. // sem_init(&print_sema, 0, 1); sem_init(&pthread_create_sema, 0, 0); thread_count = setup_threads(thread_count, cores); cerr << endl; // we run the loop several times so that we can see cache effects // for (int pass = 0; pass < 5; ++pass) { cout << "Pass " << pass << " --------------------------------------------" << endl; work_start = cycle_counter(); { // do the same as the workers, but without calling pthread_main(). // this is to avoid unneccessary tests for worker vs. master in // pthread_main(). // Thread_context & master = Thread_context::thread_contexts[0]; ++master.mileage; do_work(master.thread_id, master.mileage); master.join(); ++master.mileage; } work_end = cycle_counter(); #ifndef VERBOSE print_times(cout, thread_count, pass); #endif } } //-----------------------------------------------------------------------------
#!/usr/bin/gnuplot set term xterm plot 'c4_p1.data' using 1:2 with lines lw 3 lc 1, \ 'c4_p2.data' using 1:2 with lines lw 3 lc 2, \ 'c4_p3.data' using 1:2 with lines lw 3 lc 3, \ 'c4_p4.data' using 1:2 with lines lw 3 lc 4 pause -1 plot 'c4_p1.data' using 1:3 with lines lw 3 lc 5, \ 'c4_p2.data' using 1:3 with lines lw 3 lc 6, \ 'c4_p3.data' using 1:3 with lines lw 3 lc 7, \ 'c4_p4.data' using 1:3 with lines lw 3 lc 8 pause -1
all: Parallel Parallel: Parallel.cc g++ -O2 -o $@ $< -lpthread 2 4 8 80: all echo Running benchmarks for 1 ... $@ cores... # for cnt in `./Parallel --seq2 $@` ; do \ # ./Parallel $$cnt ; done ./Parallel $@ ; done tar czf result_$@.tgz *.data # rm -f cores_* clean: rm -f Parallel *.data result_*.tgz