Changeset: 7994be7caa4b for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7994be7caa4b Modified Files: monetdb5/mal/mal_dataflow.c Branch: Jan2014 Log Message:
Use private dataflow lock, and break thread join loop when no threads found. diffs (142 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -97,6 +97,8 @@ static MT_Lock exitingLock MT_LOCK_INITI #endif static volatile ATOMIC_TYPE exiting = 0; +static MT_Lock dataflowLock MT_LOCK_INITIALIZER("dataflowLock"); + /* * Calculate the size of the dataflow dependency graph. */ @@ -242,9 +244,9 @@ q_dequeue(Queue *q, Client cntxt) if (q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l, "q_dequeue"); - MT_lock_set(&mal_contextLock, "q_dequeue"); + MT_lock_set(&dataflowLock, "q_dequeue"); q->exitedcount++; - MT_lock_unset(&mal_contextLock, "q_dequeue"); + MT_lock_unset(&dataflowLock, "q_dequeue"); return NULL; } assert(q->last > 0); @@ -304,18 +306,18 @@ DFLOWworker(void *T) GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; - MT_lock_set(&mal_contextLock, "DFLOWworker"); + MT_lock_set(&dataflowLock, "DFLOWworker"); cntxt = t->cntxt; - MT_lock_unset(&mal_contextLock, "DFLOWworker"); + MT_lock_unset(&dataflowLock, "DFLOWworker"); if (cntxt) { /* wait until we are allowed to start working */ MT_sema_down(&t->s, "DFLOWworker"); } while (1) { if (fnxt == 0) { - MT_lock_set(&mal_contextLock, "DFLOWworker"); + MT_lock_set(&dataflowLock, "DFLOWworker"); cntxt = t->cntxt; - MT_lock_unset(&mal_contextLock, "DFLOWworker"); + MT_lock_unset(&dataflowLock, "DFLOWworker"); fe = q_dequeue(todo, cntxt); if (fe == NULL) { if (cntxt) { @@ -424,9 +426,9 @@ DFLOWworker(void *T) GDKfree(GDKerrbuf); GDKsetbuf(0); THRdel(thr); - MT_lock_set(&mal_contextLock, "DFLOWworker"); + MT_lock_set(&dataflowLock, "DFLOWworker"); t->flag = EXITED; - MT_lock_unset(&mal_contextLock, "DFLOWworker"); + MT_lock_unset(&dataflowLock, "DFLOWworker"); } /* @@ -473,6 +475,7 @@ DFLOWinitialize(void) } #ifdef NEED_MT_LOCK_INIT ATOMIC_INIT(exitingLock, "exitingLock"); + MT_lock_init(&dataflowLock, "dataflowLock"); #endif MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return 0; @@ -695,9 +698,9 @@ DFLOWscheduler(DataFlow flow, struct wor } /* release the worker from its specific task (turn it into a * generic worker) */ - MT_lock_set(&mal_contextLock, "DFLOWscheduler"); + MT_lock_set(&dataflowLock, "DFLOWscheduler"); w->cntxt = NULL; - MT_lock_unset(&mal_contextLock, "DFLOWscheduler"); + MT_lock_unset(&dataflowLock, "DFLOWscheduler"); /* wrap up errors */ assert(flow->done->last == 0); if (flow->error ) { @@ -761,7 +764,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* in addition, create one more worker that will only execute * tasks for the current client to compensate for our waiting * until all work is done */ - MT_lock_set(&mal_contextLock, "runMALdataflow"); + MT_lock_set(&dataflowLock, "runMALdataflow"); /* join with already exited threads */ while (todo->exitedcount > 0) { for (i = 0; i < THREADS; i++) { @@ -769,12 +772,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m todo->exitedcount--; workers[i].flag = IDLE; workers[i].cntxt = NULL; - MT_lock_unset(&mal_contextLock, "runMALdataflow"); + MT_lock_unset(&dataflowLock, "runMALdataflow"); MT_join_thread(workers[i].id); - MT_lock_set(&mal_contextLock, "runMALdataflow"); + MT_lock_set(&dataflowLock, "runMALdataflow"); break; } } + if (i == THREADS) + break; } for (i = 0; i < THREADS; i++) { if (workers[i].flag == IDLE) { @@ -800,14 +805,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { /* cannot start new thread, run serially */ *ret = TRUE; - MT_lock_unset(&mal_contextLock, "runMALdataflow"); + MT_lock_unset(&dataflowLock, "runMALdataflow"); return MAL_SUCCEED; } workers[i].flag = RUNNING; break; } } - MT_lock_unset(&mal_contextLock, "runMALdataflow"); + MT_lock_unset(&dataflowLock, "runMALdataflow"); if (i == THREADS) { /* no empty thread slots found, run serially */ *ret = TRUE; @@ -892,15 +897,15 @@ stopMALdataflow(void) if (todo) { for (i = 0; i < THREADS; i++) MT_sema_up(&todo->s, "stopMALdataflow"); - MT_lock_set(&mal_contextLock, "stopMALdataflow"); + MT_lock_set(&dataflowLock, "stopMALdataflow"); for (i = 0; i < THREADS; i++) { if (workers[i].flag != IDLE) { - MT_lock_unset(&mal_contextLock, "stopMALdataflow"); + MT_lock_unset(&dataflowLock, "stopMALdataflow"); MT_join_thread(workers[i].id); - MT_lock_set(&mal_contextLock, "stopMALdataflow"); + MT_lock_set(&dataflowLock, "stopMALdataflow"); } workers[i].flag = IDLE; } - MT_lock_unset(&mal_contextLock, "stopMALdataflow"); + MT_lock_unset(&dataflowLock, "stopMALdataflow"); } } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list