Changeset: 28efffc236b7 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=28efffc236b7 Modified Files: gdk/gdk_calc_private.h gdk/gdk_private.h monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_private.h Branch: default Log Message:
Merge with Jan2014 branch. diffs (173 lines): diff --git a/gdk/gdk_calc_private.h b/gdk/gdk_calc_private.h --- a/gdk/gdk_calc_private.h +++ b/gdk/gdk_calc_private.h @@ -19,6 +19,10 @@ /* This file contains shared definitions for gdk_calc.c and gdk_aggr.c */ +#ifndef LIBGDK +#error this file should not be included outside its source directory +#endif + #ifdef HAVE_LONG_LONG typedef unsigned long long ulng; #else diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h --- a/gdk/gdk_private.h +++ b/gdk/gdk_private.h @@ -19,6 +19,10 @@ /* This file should not be included in any file outside of this directory */ +#ifndef LIBGDK +#error this file should not be included outside its source directory +#endif + /* * The different parts of which a BAT consists are physically stored * next to each other in the BATstore type. diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -64,7 +64,6 @@ typedef struct queue { int size; /* size of queue */ int last; /* last element in the queue */ int exitcount; /* how many threads should exit */ - int exitedcount; /* how many threads have exited */ FlowEvent *data; MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ @@ -90,7 +89,7 @@ typedef struct DATAFLOW { static struct worker { MT_Id id; - enum {IDLE, RUNNING, EXITED} flag; + enum {IDLE, RUNNING, JOINING, EXITED} flag; Client cntxt; /* client we do work for (NULL -> any) */ MT_Sema s; } workers[THREADS]; @@ -137,7 +136,6 @@ q_create(int sz, const char *name) return NULL; } q->exitcount = 0; - q->exitedcount = 0; (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */ MT_lock_init(&q->l, name); @@ -247,9 +245,6 @@ q_dequeue(Queue *q, Client cntxt) if (q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l, "q_dequeue"); - MT_lock_set(&dataflowLock, "q_dequeue"); - q->exitedcount++; - MT_lock_unset(&dataflowLock, "q_dequeue"); return NULL; } assert(q->last > 0); @@ -461,6 +456,11 @@ DFLOWinitialize(void) for (i = 0; i < THREADS; i++) MT_sema_init(&workers[i].s, 0, "DFLOWinitialize"); limit = GDKnr_threads ? GDKnr_threads - 1 : 0; +#ifdef NEED_MT_LOCK_INIT + ATOMIC_INIT(exitingLock, "exitingLock"); + MT_lock_init(&dataflowLock, "dataflowLock"); +#endif + MT_lock_set(&dataflowLock, "DFLOWinitialize"); for (i = 0; i < limit; i++) { workers[i].flag = RUNNING; workers[i].cntxt = NULL; @@ -469,6 +469,7 @@ DFLOWinitialize(void) else created++; } + MT_lock_unset(&dataflowLock, "DFLOWinitialize"); if (created == 0) { /* no threads created */ q_destroy(todo); @@ -476,10 +477,6 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return -1; } -#ifdef NEED_MT_LOCK_INIT - ATOMIC_INIT(exitingLock, "exitingLock"); - MT_lock_init(&dataflowLock, "dataflowLock"); -#endif MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return 0; } @@ -769,20 +766,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m * until all work is done */ MT_lock_set(&dataflowLock, "runMALdataflow"); /* join with already exited threads */ - while (todo->exitedcount > 0) { - for (i = 0; i < THREADS; i++) { - if (workers[i].flag == EXITED) { - todo->exitedcount--; - workers[i].flag = IDLE; - workers[i].cntxt = NULL; - MT_lock_unset(&dataflowLock, "runMALdataflow"); - MT_join_thread(workers[i].id); - MT_lock_set(&dataflowLock, "runMALdataflow"); - break; + { + int joined; + do { + joined = 0; + for (i = 0; i < THREADS; i++) { + if (workers[i].flag == EXITED) { + workers[i].flag = JOINING; + workers[i].cntxt = NULL; + joined = 1; + MT_lock_unset(&dataflowLock, "runMALdataflow"); + MT_join_thread(workers[i].id); + MT_lock_set(&dataflowLock, "runMALdataflow"); + workers[i].flag = IDLE; + } } - } - if (i == THREADS) - break; + } while (joined); } for (i = 0; i < THREADS; i++) { if (workers[i].flag == IDLE) { @@ -805,13 +804,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* not doing a recursive call: create specific worker */ workers[i].cntxt = cntxt; } + workers[i].flag = RUNNING; if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { /* cannot start new thread, run serially */ *ret = TRUE; + workers[i].flag = IDLE; MT_lock_unset(&dataflowLock, "runMALdataflow"); return MAL_SUCCEED; } - workers[i].flag = RUNNING; break; } } @@ -902,7 +902,8 @@ stopMALdataflow(void) MT_sema_up(&todo->s, "stopMALdataflow"); MT_lock_set(&dataflowLock, "stopMALdataflow"); for (i = 0; i < THREADS; i++) { - if (workers[i].flag != IDLE) { + if (workers[i].flag != IDLE && workers[i].flag != JOINING) { + workers[i].flag = JOINING; MT_lock_unset(&dataflowLock, "stopMALdataflow"); MT_join_thread(workers[i].id); MT_lock_set(&dataflowLock, "stopMALdataflow"); diff --git a/monetdb5/mal/mal_private.h b/monetdb5/mal/mal_private.h --- a/monetdb5/mal/mal_private.h +++ b/monetdb5/mal/mal_private.h @@ -19,6 +19,10 @@ /* This file should not be included in any file outside of this directory */ +#ifndef LIBMAL +#error this file should not be included outside its source directory +#endif + #ifdef FREECLIENT /* FREECLIENT is defined in the same file as Client */ extern void MCexitClient(Client c); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list