Changeset: 7994be7caa4b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7994be7caa4b
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Jan2014
Log Message:

Use private dataflow lock, and break thread join loop when no threads found.


diffs (142 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -97,6 +97,8 @@ static MT_Lock exitingLock MT_LOCK_INITI
 #endif
 static volatile ATOMIC_TYPE exiting = 0;
 
+static MT_Lock dataflowLock MT_LOCK_INITIALIZER("dataflowLock");
+
 /*
  * Calculate the size of the dataflow dependency graph.
  */
@@ -242,9 +244,9 @@ q_dequeue(Queue *q, Client cntxt)
        if (q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l, "q_dequeue");
-               MT_lock_set(&mal_contextLock, "q_dequeue");
+               MT_lock_set(&dataflowLock, "q_dequeue");
                q->exitedcount++;
-               MT_lock_unset(&mal_contextLock, "q_dequeue");
+               MT_lock_unset(&dataflowLock, "q_dequeue");
                return NULL;
        }
        assert(q->last > 0);
@@ -304,18 +306,18 @@ DFLOWworker(void *T)
 
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
-       MT_lock_set(&mal_contextLock, "DFLOWworker");
+       MT_lock_set(&dataflowLock, "DFLOWworker");
        cntxt = t->cntxt;
-       MT_lock_unset(&mal_contextLock, "DFLOWworker");
+       MT_lock_unset(&dataflowLock, "DFLOWworker");
        if (cntxt) {
                /* wait until we are allowed to start working */
                MT_sema_down(&t->s, "DFLOWworker");
        }
        while (1) {
                if (fnxt == 0) {
-                       MT_lock_set(&mal_contextLock, "DFLOWworker");
+                       MT_lock_set(&dataflowLock, "DFLOWworker");
                        cntxt = t->cntxt;
-                       MT_lock_unset(&mal_contextLock, "DFLOWworker");
+                       MT_lock_unset(&dataflowLock, "DFLOWworker");
                        fe = q_dequeue(todo, cntxt);
                        if (fe == NULL) {
                                if (cntxt) {
@@ -424,9 +426,9 @@ DFLOWworker(void *T)
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
        THRdel(thr);
-       MT_lock_set(&mal_contextLock, "DFLOWworker");
+       MT_lock_set(&dataflowLock, "DFLOWworker");
        t->flag = EXITED;
-       MT_lock_unset(&mal_contextLock, "DFLOWworker");
+       MT_lock_unset(&dataflowLock, "DFLOWworker");
 }
 
 /*
@@ -473,6 +475,7 @@ DFLOWinitialize(void)
        }
 #ifdef NEED_MT_LOCK_INIT
        ATOMIC_INIT(exitingLock, "exitingLock");
+       MT_lock_init(&dataflowLock, "dataflowLock");
 #endif
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
        return 0;
@@ -695,9 +698,9 @@ DFLOWscheduler(DataFlow flow, struct wor
        }
        /* release the worker from its specific task (turn it into a
         * generic worker) */
-       MT_lock_set(&mal_contextLock, "DFLOWscheduler");
+       MT_lock_set(&dataflowLock, "DFLOWscheduler");
        w->cntxt = NULL;
-       MT_lock_unset(&mal_contextLock, "DFLOWscheduler");
+       MT_lock_unset(&dataflowLock, "DFLOWscheduler");
        /* wrap up errors */
        assert(flow->done->last == 0);
        if (flow->error ) {
@@ -761,7 +764,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        /* in addition, create one more worker that will only execute
         * tasks for the current client to compensate for our waiting
         * until all work is done */
-       MT_lock_set(&mal_contextLock, "runMALdataflow");
+       MT_lock_set(&dataflowLock, "runMALdataflow");
        /* join with already exited threads */
        while (todo->exitedcount > 0) {
                for (i = 0; i < THREADS; i++) {
@@ -769,12 +772,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                                todo->exitedcount--;
                                workers[i].flag = IDLE;
                                workers[i].cntxt = NULL;
-                               MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
+                               MT_lock_unset(&dataflowLock, "runMALdataflow");
                                MT_join_thread(workers[i].id);
-                               MT_lock_set(&mal_contextLock, "runMALdataflow");
+                               MT_lock_set(&dataflowLock, "runMALdataflow");
                                break;
                        }
                }
+               if (i == THREADS)
+                       break;
        }
        for (i = 0; i < THREADS; i++) {
                if (workers[i].flag == IDLE) {
@@ -800,14 +805,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                        if (MT_create_thread(&workers[i].id, DFLOWworker, (void 
*) &workers[i], MT_THR_JOINABLE) < 0) {
                                /* cannot start new thread, run serially */
                                *ret = TRUE;
-                               MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
+                               MT_lock_unset(&dataflowLock, "runMALdataflow");
                                return MAL_SUCCEED;
                        }
                        workers[i].flag = RUNNING;
                        break;
                }
        }
-       MT_lock_unset(&mal_contextLock, "runMALdataflow");
+       MT_lock_unset(&dataflowLock, "runMALdataflow");
        if (i == THREADS) {
                /* no empty thread slots found, run serially */
                *ret = TRUE;
@@ -892,15 +897,15 @@ stopMALdataflow(void)
        if (todo) {
                for (i = 0; i < THREADS; i++)
                        MT_sema_up(&todo->s, "stopMALdataflow");
-               MT_lock_set(&mal_contextLock, "stopMALdataflow");
+               MT_lock_set(&dataflowLock, "stopMALdataflow");
                for (i = 0; i < THREADS; i++) {
                        if (workers[i].flag != IDLE) {
-                               MT_lock_unset(&mal_contextLock, 
"stopMALdataflow");
+                               MT_lock_unset(&dataflowLock, "stopMALdataflow");
                                MT_join_thread(workers[i].id);
-                               MT_lock_set(&mal_contextLock, 
"stopMALdataflow");
+                               MT_lock_set(&dataflowLock, "stopMALdataflow");
                        }
                        workers[i].flag = IDLE;
                }
-               MT_lock_unset(&mal_contextLock, "stopMALdataflow");
+               MT_lock_unset(&dataflowLock, "stopMALdataflow");
        }
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to