Changeset: 07fb8a796c70 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=07fb8a796c70 Modified Files: monetdb5/mal/mal_dataflow.c Branch: Jan2014 Log Message:
Improvements to dataflow thread and lock handling. diffs (131 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -61,7 +61,6 @@ typedef struct queue { int size; /* size of queue */ int last; /* last element in the queue */ int exitcount; /* how many threads should exit */ - int exitedcount; /* how many threads have exited */ FlowEvent *data; MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ @@ -87,7 +86,7 @@ typedef struct DATAFLOW { static struct worker { MT_Id id; - enum {IDLE, RUNNING, EXITED} flag; + enum {IDLE, RUNNING, JOINING, EXITED} flag; Client cntxt; /* client we do work for (NULL -> any) */ MT_Sema s; } workers[THREADS]; @@ -134,7 +133,6 @@ q_create(int sz, const char *name) return NULL; } q->exitcount = 0; - q->exitedcount = 0; (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */ MT_lock_init(&q->l, name); @@ -244,9 +242,6 @@ q_dequeue(Queue *q, Client cntxt) if (q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l, "q_dequeue"); - MT_lock_set(&dataflowLock, "q_dequeue"); - q->exitedcount++; - MT_lock_unset(&dataflowLock, "q_dequeue"); return NULL; } assert(q->last > 0); @@ -458,6 +453,11 @@ DFLOWinitialize(void) for (i = 0; i < THREADS; i++) MT_sema_init(&workers[i].s, 0, "DFLOWinitialize"); limit = GDKnr_threads ? GDKnr_threads - 1 : 0; +#ifdef NEED_MT_LOCK_INIT + ATOMIC_INIT(exitingLock, "exitingLock"); + MT_lock_init(&dataflowLock, "dataflowLock"); +#endif + MT_lock_set(&dataflowLock, "DFLOWinitialize"); for (i = 0; i < limit; i++) { workers[i].flag = RUNNING; workers[i].cntxt = NULL; @@ -466,6 +466,7 @@ DFLOWinitialize(void) else created++; } + MT_lock_unset(&dataflowLock, "DFLOWinitialize"); if (created == 0) { /* no threads created */ q_destroy(todo); @@ -473,10 +474,6 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return -1; } -#ifdef NEED_MT_LOCK_INIT - ATOMIC_INIT(exitingLock, "exitingLock"); - MT_lock_init(&dataflowLock, "dataflowLock"); -#endif MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return 0; } @@ -766,20 +763,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m * until all work is done */ MT_lock_set(&dataflowLock, "runMALdataflow"); /* join with already exited threads */ - while (todo->exitedcount > 0) { - for (i = 0; i < THREADS; i++) { - if (workers[i].flag == EXITED) { - todo->exitedcount--; - workers[i].flag = IDLE; - workers[i].cntxt = NULL; - MT_lock_unset(&dataflowLock, "runMALdataflow"); - MT_join_thread(workers[i].id); - MT_lock_set(&dataflowLock, "runMALdataflow"); - break; + { + int joined; + do { + joined = 0; + for (i = 0; i < THREADS; i++) { + if (workers[i].flag == EXITED) { + workers[i].flag = JOINING; + workers[i].cntxt = NULL; + joined = 1; + MT_lock_unset(&dataflowLock, "runMALdataflow"); + MT_join_thread(workers[i].id); + MT_lock_set(&dataflowLock, "runMALdataflow"); + workers[i].flag = IDLE; + } } - } - if (i == THREADS) - break; + } while (joined); } for (i = 0; i < THREADS; i++) { if (workers[i].flag == IDLE) { @@ -802,13 +801,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* not doing a recursive call: create specific worker */ workers[i].cntxt = cntxt; } + workers[i].flag = RUNNING; if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { /* cannot start new thread, run serially */ *ret = TRUE; + workers[i].flag = IDLE; MT_lock_unset(&dataflowLock, "runMALdataflow"); return MAL_SUCCEED; } - workers[i].flag = RUNNING; break; } } @@ -899,7 +899,8 @@ stopMALdataflow(void) MT_sema_up(&todo->s, "stopMALdataflow"); MT_lock_set(&dataflowLock, "stopMALdataflow"); for (i = 0; i < THREADS; i++) { - if (workers[i].flag != IDLE) { + if (workers[i].flag != IDLE && workers[i].flag != JOINING) { + workers[i].flag = JOINING; MT_lock_unset(&dataflowLock, "stopMALdataflow"); MT_join_thread(workers[i].id); MT_lock_set(&dataflowLock, "stopMALdataflow"); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list