Changeset: 6384e8cbc8a3 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/6384e8cbc8a3 Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Use linked lists instead of running through all entries. diffs (211 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -20,7 +20,7 @@ * are available or resources become scarce. * * The flow graphs is organized such that parallel threads can - * access it mostly without expensive locking and dependent + * access it mostly without expensive locking and dependent * variables are easy to find.. */ #include "monetdb_config.h" @@ -84,7 +84,12 @@ static struct worker { ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */ char *errbuf; /* GDKerrbuf so that we can allocate before fork */ MT_Sema s; + int next; } workers[THREADS]; +/* heads of two mutually exclusive linked lists, both using the .next + * field in the worker struct */ +static int exited_workers = -1; /* to be joined threads */ +static int idle_workers = -1; /* idle workers (no thread associated) */ static Queue *todo = 0; /* pending instructions */ @@ -97,6 +102,8 @@ mal_dataflow_reset(void) { stopMALdataflow(); memset((char*) workers, 0, sizeof(workers)); + exited_workers = -1; + idle_workers = -1; if( todo) { GDKfree(todo->data); MT_lock_destroy(&todo->l); @@ -437,6 +444,8 @@ DFLOWworker(void *T) GDKsetbuf(0); MT_lock_set(&dataflowLock); t->flag = EXITED; + t->next = exited_workers; + exited_workers = (int) (t - workers); MT_lock_unset(&dataflowLock); } @@ -465,11 +474,14 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock); return -1; } + assert(idle_workers == -1); for (i = 0; i < THREADS; i++) { char name[MT_NAME_LEN]; snprintf(name, sizeof(name), "DFLOWsema%d", i); MT_sema_init(&workers[i].s, 0, name); workers[i].flag = IDLE; + workers[i].next = idle_workers; + idle_workers = i; if (first) /* only initialize once */ ATOMIC_PTR_INIT(&workers[i].cntxt, NULL); } @@ -478,12 +490,15 @@ DFLOWinitialize(void) if (limit > THREADS) limit = THREADS; MT_lock_set(&dataflowLock); - for (i = 0; i < limit; i++) { + while (limit > 0) { + limit--; + i = idle_workers; workers[i].errbuf = GDKmalloc(GDKMAXERRLEN); if (workers[i].errbuf == NULL) { TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer for worker"); continue; } + idle_workers = workers[i].next; workers[i].flag = RUNNING; ATOMIC_PTR_SET(&workers[i].cntxt, NULL); char name[MT_NAME_LEN]; @@ -492,6 +507,8 @@ DFLOWinitialize(void) GDKfree(workers[i].errbuf); workers[i].errbuf = NULL; workers[i].flag = IDLE; + workers[i].next = idle_workers; + idle_workers = i; } else created++; } @@ -770,64 +787,60 @@ runMALdataflow(Client cntxt, MalBlkPtr m * until all work is done */ MT_lock_set(&dataflowLock); /* join with already exited threads */ - { - int joined; - do { - joined = 0; - for (i = 0; i < THREADS; i++) { - if (workers[i].flag == EXITED) { - workers[i].flag = JOINING; - ATOMIC_PTR_SET(&workers[i].cntxt, NULL); - joined = 1; - MT_lock_unset(&dataflowLock); - MT_join_thread(workers[i].id); - MT_lock_set(&dataflowLock); - workers[i].flag = IDLE; + while (exited_workers >= 0) { + i = exited_workers; + exited_workers = workers[i].next; + assert(workers[i].flag == EXITED); + workers[i].flag = JOINING; + MT_lock_unset(&dataflowLock); + MT_join_thread(workers[i].id); + MT_lock_set(&dataflowLock); + workers[i].flag = IDLE; + workers[i].next = idle_workers; + idle_workers = i; + } + if ((i = idle_workers) >= 0) { + assert(workers[i].flag == IDLE); + /* only create specific worker if we are not doing a + * recursive call */ + if (stk->calldepth > 1) { + int j; + MT_Id pid = MT_getpid(); + + /* doing a recursive call: copy specificity from + * current worker to new worker */ + ATOMIC_PTR_SET(&workers[i].cntxt, NULL); + for (j = 0; j < THREADS; j++) { + if (workers[j].flag == RUNNING && workers[j].id == pid) { + ATOMIC_PTR_SET(&workers[i].cntxt, + ATOMIC_PTR_GET(&workers[j].cntxt)); + break; } } - } while (joined); - } - for (i = 0; i < THREADS; i++) { - if (workers[i].flag == IDLE) { - /* only create specific worker if we are not doing a - * recursive call */ - if (stk->calldepth > 1) { - int j; - MT_Id pid = MT_getpid(); - - /* doing a recursive call: copy specificity from - * current worker to new worker */ - ATOMIC_PTR_SET(&workers[i].cntxt, NULL); - for (j = 0; j < THREADS; j++) { - if (workers[j].flag == RUNNING && workers[j].id == pid) { - ATOMIC_PTR_SET(&workers[i].cntxt, - ATOMIC_PTR_GET(&workers[j].cntxt)); - break; - } - } - } else { - /* not doing a recursive call: create specific worker */ - ATOMIC_PTR_SET(&workers[i].cntxt, cntxt); - } - workers[i].flag = RUNNING; - char name[MT_NAME_LEN]; - snprintf(name, sizeof(name), "DFLOWworker%d", i); - if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == NULL || - (workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i], - MT_THR_JOINABLE, name)) == 0) { - /* cannot start new thread, run serially */ - *ret = TRUE; - GDKfree(workers[i].errbuf); - workers[i].errbuf = NULL; - workers[i].flag = IDLE; - MT_lock_unset(&dataflowLock); - return MAL_SUCCEED; - } - break; + } else { + /* not doing a recursive call: create specific worker */ + ATOMIC_PTR_SET(&workers[i].cntxt, cntxt); + } + idle_workers = workers[i].next; + workers[i].flag = RUNNING; + char name[MT_NAME_LEN]; + snprintf(name, sizeof(name), "DFLOWworker%d", i); + if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == NULL || + (workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i], + MT_THR_JOINABLE, name)) == 0) { + /* cannot start new thread, run serially */ + *ret = TRUE; + GDKfree(workers[i].errbuf); + workers[i].errbuf = NULL; + workers[i].flag = IDLE; + workers[i].next = idle_workers; + idle_workers = i; + MT_lock_unset(&dataflowLock); + return MAL_SUCCEED; } } MT_lock_unset(&dataflowLock); - if (i == THREADS) { + if (i < 0) { /* no empty thread slots found, run serially */ *ret = TRUE; return MAL_SUCCEED; @@ -927,7 +940,11 @@ stopMALdataflow(void) MT_join_thread(workers[i].id); MT_lock_set(&dataflowLock); } - workers[i].flag = IDLE; + if (workers[i].flag != IDLE) { + workers[i].flag = IDLE; + workers[i].next = idle_workers; + idle_workers = i; + } MT_sema_destroy(&workers[i].s); } MT_lock_unset(&dataflowLock); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list