Changeset: df0b118accb2 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=df0b118accb2 Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Fixed a race condition in the dataflow scheduler. If one of the dataflow threads came across an error, it would cause DFLOWscheduler to return. This triggered a cleanup of various dataflow structures. However, other dataflow threads were still using those structures and could then cause a crash. The problem is solved by signalling all threads that they should terminate, and then joining those threads before cleanup. diffs (162 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -86,6 +86,8 @@ typedef struct DataFlow { int nway; /* number of workers */ FlowTask *worker; /* worker threads for the client */ struct DataFlow *free; /* free list */ + int terminate; /* set if we need to terminate */ + MT_Lock termlock; /* lock to protect the above */ } *DataFlow, DataFlowRec; /* does not seem to have a major impact */ @@ -249,14 +251,12 @@ q_create(int sz) return q; } -/* static void q_destroy(queue *q) { GDKfree(q->data); GDKfree(q); } -*/ /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */ /* we might actually sort it for better scheduling behavior */ @@ -314,9 +314,10 @@ q_dequeue(queue *q) MT_sema_down(&q->s, "q_dequeue"); MT_lock_set(&q->l, "q_dequeue"); - assert(q->last > 0); - /* LIFO favors garbage collection */ - r = q->data[--q->last]; + if (q->last > 0) + /* LIFO favors garbage collection */ + r = q->data[--q->last]; + /* else: terminating */ /* try out random draw * { int i; @@ -824,6 +825,12 @@ runDFLOWworker(void *t) local = nxtfs != 0; if (nxtfs == 0) { fs = (FlowStatus)q_dequeue(task->todo); + MT_lock_set(&task->flow->termlock, "runDFLOWworker"); + if (task->flow->terminate) { + MT_lock_unset(&task->flow->termlock, "runDFLOWworker"); + break; + } + MT_lock_unset(&task->flow->termlock, "runDFLOWworker"); #ifdef USE_DFLOW_ADMISSION if (DFLOWadmission(fs->argclaim, fs->hotclaim)) { @@ -1168,7 +1175,6 @@ DFLOWscheduler(DataFlow flow) return ret; } -static DataFlow flows = NULL; static int workerid = 0; str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, @@ -1177,6 +1183,7 @@ str runMALdataflow(Client cntxt, MalBlkP DataFlow flow = NULL; str ret = MAL_SUCCEED; int size; + int i; #ifdef DEBUG_FLOW mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc, stoppc); @@ -1191,37 +1198,33 @@ str runMALdataflow(Client cntxt, MalBlkP return MAL_SUCCEED; assert(stoppc > startpc); - mal_set_lock(mal_contextLock, "runMALdataflow"); - flow = flows; - if (flow) { - flows = flow->free; - } else { - int i; + flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); + MT_lock_init(&flow->termlock, "runMALdataflow"); - flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); + /* seems enough for the time being */ + flow->done = q_create(2048); + flow->todo = q_create(2048); - /* seems enough for the time being */ - flow->done = q_create(2048); - flow->todo = q_create(2048); + /* queues are available? */ + if (flow->done == NULL || flow->todo == NULL) { + return MAL_SUCCEED; + } - /* queues are available? */ - if (flow->done == NULL || flow->todo == NULL) { - mal_unset_lock(mal_contextLock, "runMALdataflow"); - return MAL_SUCCEED; - } + flow->worker = NULL; + flow->nway = GDKnr_threads ? GDKnr_threads : 1; + if (flow->nway > stoppc - startpc) + flow->nway = stoppc - startpc; + flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) * flow->nway); + for (i = 0; i < flow->nway; i++) { + flow->worker[i].id = workerid++; + flow->worker[i].todo = flow->todo; + flow->worker[i].flow = flow; + /* create the thread and let it wait */ + MT_create_thread(&flow->worker[i].tid, runDFLOWworker, + flow->worker + i, MT_THR_JOINABLE); + } - flow->worker = NULL; - flow->nway = GDKnr_threads ? GDKnr_threads : 1; - flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) * flow->nway); - for (i = 0; i < flow->nway; i++) { - flow->worker[i].id = workerid++; - flow->worker[i].todo = flow->todo; - flow->worker[i].flow = flow; - /* create the thread and let it wait */ - MT_create_thread(&flow->worker[i].tid, runDFLOWworker, flow->worker + i, MT_THR_DETACHED); - } - } /* keep real block count, exclude brackets */ flow->start = startpc + 1; flow->stop = stoppc; @@ -1231,18 +1234,23 @@ str runMALdataflow(Client cntxt, MalBlkP flow->nodes = (int*)GDKzalloc(sizeof(int) * size); flow->edges = (int*)GDKzalloc(sizeof(int) * size); DFLOWinit(flow, cntxt, mb, stk, size); - mal_unset_lock(mal_contextLock, "runMALdataflow"); ret = DFLOWscheduler(flow); + + MT_lock_set(&flow->termlock, "runMALdataflow"); + flow->terminate = 1; + MT_lock_unset(&flow->termlock, "runMALdataflow"); + for (i = 0; i < flow->nway; i++) + MT_sema_up(&flow->todo->s, "runMALdataflow"); + for (i = 0; i < flow->nway; i++) + MT_join_thread(flow->worker[i].tid); + + q_destroy(flow->done); + q_destroy(flow->todo); + GDKfree(flow->worker); GDKfree(flow->status); - flow->status = 0; GDKfree(flow->edges); - flow->edges = 0; GDKfree(flow->nodes); - flow->nodes = 0; - mal_set_lock(mal_contextLock, "runMALdataflow"); - flow->free = flows; - flows = flow; - mal_unset_lock(mal_contextLock, "runMALdataflow"); + GDKfree(flow); return ret; } _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list