Changeset: f185e3fa6c50 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f185e3fa6c50 Modified Files: monetdb5/mal/mal.c monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Merge with Feb2013 branch. diffs (123 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -64,6 +64,7 @@ typedef struct queue { FlowEvent *data; MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ + MT_Sema e; /* synchronize exiting of thread */ } Queue; /* @@ -130,6 +131,7 @@ q_create(int sz, const char *name) (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */ MT_lock_init(&q->l, name); MT_sema_init(&q->s, 0, name); + MT_sema_init(&q->e, 0, name); return q; } @@ -218,6 +220,7 @@ q_dequeue(Queue *q) if (q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l, "q_dequeue"); + MT_sema_up(&q->e, "q_dequeue"); return NULL; } assert(q->last > 0); @@ -379,24 +382,40 @@ DFLOWworker(void *T) * typically is equal to the number of cores * The workers are assembled in a local table to enable debugging. */ -static void +static int DFLOWinitialize(void) { int i, limit; + int created = 0; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); if (todo) { + /* somebody else beat us to it */ MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + return 0; } todo = q_create(2048, "DFLOWinitialize"); + if (todo == NULL) { + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return -1; + } limit = GDKnr_threads ? GDKnr_threads : 1; for (i = 0; i < limit; i++) { workers[i].flag = RUNNING; if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) workers[i].flag = IDLE; + else + created++; + } + if (created == 0) { + /* no threads created */ + q_destroy(todo); + todo = NULL; + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return -1; } MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return 0; } /* @@ -632,6 +651,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m if (stk == NULL) throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL"); ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0); + *ret = FALSE; if (stk->cmd) { *ret = TRUE; return MAL_SUCCEED; @@ -641,7 +661,12 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* check existence of workers */ if (todo == NULL) { - DFLOWinitialize(); /* create the whole pool */ + /* create thread pool */ + if (DFLOWinitialize() < 0) { + /* no threads created, run serially */ + *ret = TRUE; + return MAL_SUCCEED; + } i = THREADS; /* we didn't create an extra thread */ } else { /* create one more worker to compensate for our waiting until @@ -650,11 +675,21 @@ runMALdataflow(Client cntxt, MalBlkPtr m for (i = 0; i < THREADS; i++) { if (workers[i].flag == IDLE) { workers[i].flag = RUNNING; - MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE); + if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { + /* cannot start new thread, run serially */ + *ret = TRUE; + MT_lock_unset(&mal_contextLock, "runMALdataflow"); + return MAL_SUCCEED; + } break; } } MT_lock_unset(&mal_contextLock, "runMALdataflow"); + if (i == THREADS) { + /* no empty threads slots found, run serially */ + *ret = TRUE; + return MAL_SUCCEED; + } } assert(todo); @@ -723,6 +758,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m todo->exitcount++; MT_lock_unset(&todo->l, "runMALdataflow"); MT_sema_up(&todo->s, "runMALdataflow"); + MT_sema_down(&todo->e, "runMALdataflow"); MT_lock_set(&mal_contextLock, "runMALdataflow"); for (i = 0; i < THREADS; i++) { if (workers[i].flag == EXITED) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list