Changeset: 28efffc236b7 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=28efffc236b7
Modified Files:
        gdk/gdk_calc_private.h
        gdk/gdk_private.h
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_private.h
Branch: default
Log Message:

Merge with Jan2014 branch.


diffs (173 lines):

diff --git a/gdk/gdk_calc_private.h b/gdk/gdk_calc_private.h
--- a/gdk/gdk_calc_private.h
+++ b/gdk/gdk_calc_private.h
@@ -19,6 +19,10 @@
 
 /* This file contains shared definitions for gdk_calc.c and gdk_aggr.c */
 
+#ifndef LIBGDK
+#error this file should not be included outside its source directory
+#endif
+
 #ifdef HAVE_LONG_LONG
 typedef unsigned long long ulng;
 #else
diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h
--- a/gdk/gdk_private.h
+++ b/gdk/gdk_private.h
@@ -19,6 +19,10 @@
 
 /* This file should not be included in any file outside of this directory */
 
+#ifndef LIBGDK
+#error this file should not be included outside its source directory
+#endif
+
 /*
  * The different parts of which a BAT consists are physically stored
  * next to each other in the BATstore type.
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -64,7 +64,6 @@ typedef struct queue {
        int size;       /* size of queue */
        int last;       /* last element in the queue */
        int exitcount;  /* how many threads should exit */
-       int exitedcount;                        /* how many threads have exited 
*/
        FlowEvent *data;
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
@@ -90,7 +89,7 @@ typedef struct DATAFLOW {
 
 static struct worker {
        MT_Id id;
-       enum {IDLE, RUNNING, EXITED} flag;
+       enum {IDLE, RUNNING, JOINING, EXITED} flag;
        Client cntxt;                           /* client we do work for (NULL 
-> any) */
        MT_Sema s;
 } workers[THREADS];
@@ -137,7 +136,6 @@ q_create(int sz, const char *name)
                return NULL;
        }
        q->exitcount = 0;
-       q->exitedcount = 0;
 
        (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
        MT_lock_init(&q->l, name);
@@ -247,9 +245,6 @@ q_dequeue(Queue *q, Client cntxt)
        if (q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l, "q_dequeue");
-               MT_lock_set(&dataflowLock, "q_dequeue");
-               q->exitedcount++;
-               MT_lock_unset(&dataflowLock, "q_dequeue");
                return NULL;
        }
        assert(q->last > 0);
@@ -461,6 +456,11 @@ DFLOWinitialize(void)
        for (i = 0; i < THREADS; i++)
                MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
        limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
+#ifdef NEED_MT_LOCK_INIT
+       ATOMIC_INIT(exitingLock, "exitingLock");
+       MT_lock_init(&dataflowLock, "dataflowLock");
+#endif
+       MT_lock_set(&dataflowLock, "DFLOWinitialize");
        for (i = 0; i < limit; i++) {
                workers[i].flag = RUNNING;
                workers[i].cntxt = NULL;
@@ -469,6 +469,7 @@ DFLOWinitialize(void)
                else
                        created++;
        }
+       MT_lock_unset(&dataflowLock, "DFLOWinitialize");
        if (created == 0) {
                /* no threads created */
                q_destroy(todo);
@@ -476,10 +477,6 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return -1;
        }
-#ifdef NEED_MT_LOCK_INIT
-       ATOMIC_INIT(exitingLock, "exitingLock");
-       MT_lock_init(&dataflowLock, "dataflowLock");
-#endif
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
        return 0;
 }
@@ -769,20 +766,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m
         * until all work is done */
        MT_lock_set(&dataflowLock, "runMALdataflow");
        /* join with already exited threads */
-       while (todo->exitedcount > 0) {
-               for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag == EXITED) {
-                               todo->exitedcount--;
-                               workers[i].flag = IDLE;
-                               workers[i].cntxt = NULL;
-                               MT_lock_unset(&dataflowLock, "runMALdataflow");
-                               MT_join_thread(workers[i].id);
-                               MT_lock_set(&dataflowLock, "runMALdataflow");
-                               break;
+       {
+               int joined;
+               do {
+                       joined = 0;
+                       for (i = 0; i < THREADS; i++) {
+                               if (workers[i].flag == EXITED) {
+                                       workers[i].flag = JOINING;
+                                       workers[i].cntxt = NULL;
+                                       joined = 1;
+                                       MT_lock_unset(&dataflowLock, 
"runMALdataflow");
+                                       MT_join_thread(workers[i].id);
+                                       MT_lock_set(&dataflowLock, 
"runMALdataflow");
+                                       workers[i].flag = IDLE;
+                               }
                        }
-               }
-               if (i == THREADS)
-                       break;
+               } while (joined);
        }
        for (i = 0; i < THREADS; i++) {
                if (workers[i].flag == IDLE) {
@@ -805,13 +804,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                                /* not doing a recursive call: create specific 
worker */
                                workers[i].cntxt = cntxt;
                        }
+                       workers[i].flag = RUNNING;
                        if (MT_create_thread(&workers[i].id, DFLOWworker, (void 
*) &workers[i], MT_THR_JOINABLE) < 0) {
                                /* cannot start new thread, run serially */
                                *ret = TRUE;
+                               workers[i].flag = IDLE;
                                MT_lock_unset(&dataflowLock, "runMALdataflow");
                                return MAL_SUCCEED;
                        }
-                       workers[i].flag = RUNNING;
                        break;
                }
        }
@@ -902,7 +902,8 @@ stopMALdataflow(void)
                        MT_sema_up(&todo->s, "stopMALdataflow");
                MT_lock_set(&dataflowLock, "stopMALdataflow");
                for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag != IDLE) {
+                       if (workers[i].flag != IDLE && workers[i].flag != 
JOINING) {
+                               workers[i].flag = JOINING;
                                MT_lock_unset(&dataflowLock, "stopMALdataflow");
                                MT_join_thread(workers[i].id);
                                MT_lock_set(&dataflowLock, "stopMALdataflow");
diff --git a/monetdb5/mal/mal_private.h b/monetdb5/mal/mal_private.h
--- a/monetdb5/mal/mal_private.h
+++ b/monetdb5/mal/mal_private.h
@@ -19,6 +19,10 @@
 
 /* This file should not be included in any file outside of this directory */
 
+#ifndef LIBMAL
+#error this file should not be included outside its source directory
+#endif
+
 #ifdef FREECLIENT
 /* FREECLIENT is defined in the same file as Client */
 extern void MCexitClient(Client c);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to