Changeset: df0b118accb2 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=df0b118accb2
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Fixed a race condition in the dataflow scheduler.
If one of the dataflow threads came across an error, it would cause
DFLOWscheduler to return.  This triggered a cleanup of various
dataflow structures.  However, other dataflow threads were still using
those structures and could then cause a crash.
The problem is solved by signalling all threads that they should
terminate, and then joining those threads before cleanup.


diffs (162 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -86,6 +86,8 @@ typedef struct DataFlow {
        int    nway;            /* number of workers */
        FlowTask *worker;       /* worker threads for the client */
        struct DataFlow *free;  /* free list */
+       int terminate;                  /* set if we need to terminate */
+       MT_Lock termlock;               /* lock to protect the above */
 } *DataFlow, DataFlowRec;
 
 /* does not seem to have a major impact */
@@ -249,14 +251,12 @@ q_create(int sz)
        return q;
 }
 
-/*
 static void
 q_destroy(queue *q)
 {
        GDKfree(q->data);
        GDKfree(q);
 }
-*/
 
 /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
 /* we might actually sort it for better scheduling behavior */
@@ -314,9 +314,10 @@ q_dequeue(queue *q)
 
        MT_sema_down(&q->s, "q_dequeue");
        MT_lock_set(&q->l, "q_dequeue");
-       assert(q->last > 0);
-       /* LIFO favors garbage collection */
-       r = q->data[--q->last];
+       if (q->last > 0)
+               /* LIFO favors garbage collection */
+               r = q->data[--q->last];
+       /* else: terminating */
        /* try out random draw *
        {
                int i;
@@ -824,6 +825,12 @@ runDFLOWworker(void *t)
                local = nxtfs != 0;
                if (nxtfs == 0) {
                        fs = (FlowStatus)q_dequeue(task->todo);
+                       MT_lock_set(&task->flow->termlock, "runDFLOWworker");
+                       if (task->flow->terminate) {
+                               MT_lock_unset(&task->flow->termlock, 
"runDFLOWworker");
+                               break;
+                       }
+                       MT_lock_unset(&task->flow->termlock, "runDFLOWworker");
 
 #ifdef USE_DFLOW_ADMISSION
                        if (DFLOWadmission(fs->argclaim, fs->hotclaim)) {
@@ -1168,7 +1175,6 @@ DFLOWscheduler(DataFlow flow)
        return ret;
 }
 
-static DataFlow flows = NULL;
 static int workerid = 0;
 
 str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc,
@@ -1177,6 +1183,7 @@ str runMALdataflow(Client cntxt, MalBlkP
        DataFlow flow = NULL;
        str ret = MAL_SUCCEED;
        int size;
+       int i;
 
 #ifdef DEBUG_FLOW
        mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc, 
stoppc);
@@ -1191,37 +1198,33 @@ str runMALdataflow(Client cntxt, MalBlkP
                return MAL_SUCCEED;
 
        assert(stoppc > startpc);
-       mal_set_lock(mal_contextLock, "runMALdataflow");
-       flow = flows;
 
-       if (flow) {
-               flows = flow->free;
-       } else {
-               int i;
+       flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
+       MT_lock_init(&flow->termlock, "runMALdataflow");
 
-               flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
+       /* seems enough for the time being */
+       flow->done = q_create(2048);
+       flow->todo = q_create(2048);
 
-               /* seems enough for the time being */
-               flow->done = q_create(2048);
-               flow->todo = q_create(2048);
+       /* queues are available? */
+       if (flow->done == NULL || flow->todo == NULL) {
+               return MAL_SUCCEED;
+       }
 
-               /* queues are available? */
-               if (flow->done == NULL || flow->todo == NULL) {
-                       mal_unset_lock(mal_contextLock, "runMALdataflow");
-                       return MAL_SUCCEED;
-               }
+       flow->worker = NULL;
+       flow->nway = GDKnr_threads ? GDKnr_threads : 1;
+       if (flow->nway > stoppc - startpc)
+               flow->nway = stoppc - startpc;
+       flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) * flow->nway);
+       for (i = 0; i < flow->nway; i++) {
+               flow->worker[i].id = workerid++;
+               flow->worker[i].todo = flow->todo;
+               flow->worker[i].flow = flow;
+               /* create the thread and let it wait */
+               MT_create_thread(&flow->worker[i].tid, runDFLOWworker,
+                                                flow->worker + i, 
MT_THR_JOINABLE);
+       }
 
-               flow->worker = NULL;
-               flow->nway = GDKnr_threads ? GDKnr_threads : 1;
-               flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) * 
flow->nway);
-               for (i = 0; i < flow->nway; i++) {
-                       flow->worker[i].id = workerid++;
-                       flow->worker[i].todo = flow->todo;
-                       flow->worker[i].flow = flow;
-                       /* create the thread and let it wait */
-                       MT_create_thread(&flow->worker[i].tid, runDFLOWworker, 
flow->worker + i, MT_THR_DETACHED);
-               }
-       }
        /* keep real block count, exclude brackets */
        flow->start = startpc + 1;
        flow->stop = stoppc;
@@ -1231,18 +1234,23 @@ str runMALdataflow(Client cntxt, MalBlkP
        flow->nodes = (int*)GDKzalloc(sizeof(int) * size);
        flow->edges = (int*)GDKzalloc(sizeof(int) * size);
        DFLOWinit(flow, cntxt, mb, stk, size);
-       mal_unset_lock(mal_contextLock, "runMALdataflow");
 
        ret = DFLOWscheduler(flow);
+
+       MT_lock_set(&flow->termlock, "runMALdataflow");
+       flow->terminate = 1;
+       MT_lock_unset(&flow->termlock, "runMALdataflow");
+       for (i = 0; i < flow->nway; i++)
+               MT_sema_up(&flow->todo->s, "runMALdataflow");
+       for (i = 0; i < flow->nway; i++)
+               MT_join_thread(flow->worker[i].tid);
+
+       q_destroy(flow->done);
+       q_destroy(flow->todo);
+       GDKfree(flow->worker);
        GDKfree(flow->status);
-       flow->status = 0;
        GDKfree(flow->edges);
-       flow->edges = 0;
        GDKfree(flow->nodes);
-       flow->nodes = 0;
-       mal_set_lock(mal_contextLock, "runMALdataflow");
-       flow->free = flows;
-       flows = flow;
-       mal_unset_lock(mal_contextLock, "runMALdataflow");
+       GDKfree(flow);
        return ret;
 }
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to