Changeset: f185e3fa6c50 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f185e3fa6c50
Modified Files:
        monetdb5/mal/mal.c
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Merge with Feb2013 branch.


diffs (123 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -64,6 +64,7 @@ typedef struct queue {
        FlowEvent *data;
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
+       MT_Sema e;      /* synchronize exiting of thread */
 } Queue;
 
 /*
@@ -130,6 +131,7 @@ q_create(int sz, const char *name)
        (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
        MT_lock_init(&q->l, name);
        MT_sema_init(&q->s, 0, name);
+       MT_sema_init(&q->e, 0, name);
        return q;
 }
 
@@ -218,6 +220,7 @@ q_dequeue(Queue *q)
        if (q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l, "q_dequeue");
+               MT_sema_up(&q->e, "q_dequeue");
                return NULL;
        }
        assert(q->last > 0);
@@ -379,24 +382,40 @@ DFLOWworker(void *T)
  * typically is equal to the number of cores
  * The workers are assembled in a local table to enable debugging.
  */
-static void
+static int
 DFLOWinitialize(void)
 {
        int i, limit;
+       int created = 0;
 
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
        if (todo) {
+               /* somebody else beat us to it */
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               return;
+               return 0;
        }
        todo = q_create(2048, "DFLOWinitialize");
+       if (todo == NULL) {
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+               return -1;
+       }
        limit = GDKnr_threads ? GDKnr_threads : 1;
        for (i = 0; i < limit; i++) {
                workers[i].flag = RUNNING;
                if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE) < 0)
                        workers[i].flag = IDLE;
+               else
+                       created++;
+       }
+       if (created == 0) {
+               /* no threads created */
+               q_destroy(todo);
+               todo = NULL;
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+               return -1;
        }
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+       return 0;
 }
 
 /*
@@ -632,6 +651,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (stk == NULL)
                throw(MAL, "dataflow", "runMALdataflow(): Called with stk == 
NULL");
        ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0);
+       *ret = FALSE;
        if (stk->cmd) {
                *ret = TRUE;
                return MAL_SUCCEED;
@@ -641,7 +661,12 @@ runMALdataflow(Client cntxt, MalBlkPtr m
 
        /* check existence of workers */
        if (todo == NULL) {
-               DFLOWinitialize();              /* create the whole pool */
+               /* create thread pool */
+               if (DFLOWinitialize() < 0) {
+                       /* no threads created, run serially */
+                       *ret = TRUE;
+                       return MAL_SUCCEED;
+               }
                i = THREADS;                    /* we didn't create an extra 
thread */
        } else {
                /* create one more worker to compensate for our waiting until
@@ -650,11 +675,21 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                for (i = 0; i < THREADS; i++) {
                        if (workers[i].flag == IDLE) {
                                workers[i].flag = RUNNING;
-                               MT_create_thread(&workers[i].id, DFLOWworker, 
(void *) &workers[i], MT_THR_JOINABLE);
+                               if (MT_create_thread(&workers[i].id, 
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
+                                       /* cannot start new thread, run 
serially */
+                                       *ret = TRUE;
+                                       MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
+                                       return MAL_SUCCEED;
+                               }
                                break;
                        }
                }
                MT_lock_unset(&mal_contextLock, "runMALdataflow");
+               if (i == THREADS) {
+                       /* no empty threads slots found, run serially */
+                       *ret = TRUE;
+                       return MAL_SUCCEED;
+               }
        }
        assert(todo);
 
@@ -723,6 +758,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                todo->exitcount++;
                MT_lock_unset(&todo->l, "runMALdataflow");
                MT_sema_up(&todo->s, "runMALdataflow");
+               MT_sema_down(&todo->e, "runMALdataflow");
                MT_lock_set(&mal_contextLock, "runMALdataflow");
                for (i = 0; i < THREADS; i++) {
                        if (workers[i].flag == EXITED) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to