Changeset: 6384e8cbc8a3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/6384e8cbc8a3
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Use linked lists instead of running through all entries.


diffs (211 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -20,7 +20,7 @@
  * are available or resources become scarce.
  *
  * The flow graphs is organized such that parallel threads can
- * access it mostly without expensive locking and dependent 
+ * access it mostly without expensive locking and dependent
  * variables are easy to find..
  */
 #include "monetdb_config.h"
@@ -84,7 +84,12 @@ static struct worker {
        ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
        char *errbuf;              /* GDKerrbuf so that we can allocate before 
fork */
        MT_Sema s;
+       int next;
 } workers[THREADS];
+/* heads of two mutually exclusive linked lists, both using the .next
+ * field in the worker struct */
+static int exited_workers = -1;        /* to be joined threads */
+static int idle_workers = -1;  /* idle workers (no thread associated) */
 
 static Queue *todo = 0;        /* pending instructions */
 
@@ -97,6 +102,8 @@ mal_dataflow_reset(void)
 {
        stopMALdataflow();
        memset((char*) workers, 0,  sizeof(workers));
+       exited_workers = -1;
+       idle_workers = -1;
        if( todo) {
                GDKfree(todo->data);
                MT_lock_destroy(&todo->l);
@@ -437,6 +444,8 @@ DFLOWworker(void *T)
        GDKsetbuf(0);
        MT_lock_set(&dataflowLock);
        t->flag = EXITED;
+       t->next = exited_workers;
+       exited_workers = (int) (t - workers);
        MT_lock_unset(&dataflowLock);
 }
 
@@ -465,11 +474,14 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock);
                return -1;
        }
+       assert(idle_workers == -1);
        for (i = 0; i < THREADS; i++) {
                char name[MT_NAME_LEN];
                snprintf(name, sizeof(name), "DFLOWsema%d", i);
                MT_sema_init(&workers[i].s, 0, name);
                workers[i].flag = IDLE;
+               workers[i].next = idle_workers;
+               idle_workers = i;
                if (first)                              /* only initialize once 
*/
                        ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
        }
@@ -478,12 +490,15 @@ DFLOWinitialize(void)
        if (limit > THREADS)
                limit = THREADS;
        MT_lock_set(&dataflowLock);
-       for (i = 0; i < limit; i++) {
+       while (limit > 0) {
+               limit--;
+               i = idle_workers;
                workers[i].errbuf = GDKmalloc(GDKMAXERRLEN);
                if (workers[i].errbuf == NULL) {
                        TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer 
for worker");
                        continue;
                }
+               idle_workers = workers[i].next;
                workers[i].flag = RUNNING;
                ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
                char name[MT_NAME_LEN];
@@ -492,6 +507,8 @@ DFLOWinitialize(void)
                        GDKfree(workers[i].errbuf);
                        workers[i].errbuf = NULL;
                        workers[i].flag = IDLE;
+                       workers[i].next = idle_workers;
+                       idle_workers = i;
                } else
                        created++;
        }
@@ -770,64 +787,60 @@ runMALdataflow(Client cntxt, MalBlkPtr m
         * until all work is done */
        MT_lock_set(&dataflowLock);
        /* join with already exited threads */
-       {
-               int joined;
-               do {
-                       joined = 0;
-                       for (i = 0; i < THREADS; i++) {
-                               if (workers[i].flag == EXITED) {
-                                       workers[i].flag = JOINING;
-                                       ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
-                                       joined = 1;
-                                       MT_lock_unset(&dataflowLock);
-                                       MT_join_thread(workers[i].id);
-                                       MT_lock_set(&dataflowLock);
-                                       workers[i].flag = IDLE;
+       while (exited_workers >= 0) {
+               i = exited_workers;
+               exited_workers = workers[i].next;
+               assert(workers[i].flag == EXITED);
+               workers[i].flag = JOINING;
+               MT_lock_unset(&dataflowLock);
+               MT_join_thread(workers[i].id);
+               MT_lock_set(&dataflowLock);
+               workers[i].flag = IDLE;
+               workers[i].next = idle_workers;
+               idle_workers = i;
+       }
+       if ((i = idle_workers) >= 0) {
+               assert(workers[i].flag == IDLE);
+               /* only create specific worker if we are not doing a
+                * recursive call */
+               if (stk->calldepth > 1) {
+                       int j;
+                       MT_Id pid = MT_getpid();
+
+                       /* doing a recursive call: copy specificity from
+                        * current worker to new worker */
+                       ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
+                       for (j = 0; j < THREADS; j++) {
+                               if (workers[j].flag == RUNNING && workers[j].id 
== pid) {
+                                       ATOMIC_PTR_SET(&workers[i].cntxt,
+                                                                  
ATOMIC_PTR_GET(&workers[j].cntxt));
+                                       break;
                                }
                        }
-               } while (joined);
-       }
-       for (i = 0; i < THREADS; i++) {
-               if (workers[i].flag == IDLE) {
-                       /* only create specific worker if we are not doing a
-                        * recursive call */
-                       if (stk->calldepth > 1) {
-                               int j;
-                               MT_Id pid = MT_getpid();
-
-                               /* doing a recursive call: copy specificity from
-                                * current worker to new worker */
-                               ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
-                               for (j = 0; j < THREADS; j++) {
-                                       if (workers[j].flag == RUNNING && 
workers[j].id == pid) {
-                                               
ATOMIC_PTR_SET(&workers[i].cntxt,
-                                                                          
ATOMIC_PTR_GET(&workers[j].cntxt));
-                                               break;
-                                       }
-                               }
-                       } else {
-                               /* not doing a recursive call: create specific 
worker */
-                               ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
-                       }
-                       workers[i].flag = RUNNING;
-                       char name[MT_NAME_LEN];
-                       snprintf(name, sizeof(name), "DFLOWworker%d", i);
-                       if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == 
NULL ||
-                               (workers[i].id = THRcreate(DFLOWworker, (void 
*) &workers[i],
-                                                                               
   MT_THR_JOINABLE, name)) == 0) {
-                               /* cannot start new thread, run serially */
-                               *ret = TRUE;
-                               GDKfree(workers[i].errbuf);
-                               workers[i].errbuf = NULL;
-                               workers[i].flag = IDLE;
-                               MT_lock_unset(&dataflowLock);
-                               return MAL_SUCCEED;
-                       }
-                       break;
+               } else {
+                       /* not doing a recursive call: create specific worker */
+                       ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
+               }
+               idle_workers = workers[i].next;
+               workers[i].flag = RUNNING;
+               char name[MT_NAME_LEN];
+               snprintf(name, sizeof(name), "DFLOWworker%d", i);
+               if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == NULL ||
+                       (workers[i].id = THRcreate(DFLOWworker, (void *) 
&workers[i],
+                                                                          
MT_THR_JOINABLE, name)) == 0) {
+                       /* cannot start new thread, run serially */
+                       *ret = TRUE;
+                       GDKfree(workers[i].errbuf);
+                       workers[i].errbuf = NULL;
+                       workers[i].flag = IDLE;
+                       workers[i].next = idle_workers;
+                       idle_workers = i;
+                       MT_lock_unset(&dataflowLock);
+                       return MAL_SUCCEED;
                }
        }
        MT_lock_unset(&dataflowLock);
-       if (i == THREADS) {
+       if (i < 0) {
                /* no empty thread slots found, run serially */
                *ret = TRUE;
                return MAL_SUCCEED;
@@ -927,7 +940,11 @@ stopMALdataflow(void)
                                MT_join_thread(workers[i].id);
                                MT_lock_set(&dataflowLock);
                        }
-                       workers[i].flag = IDLE;
+                       if (workers[i].flag != IDLE) {
+                               workers[i].flag = IDLE;
+                               workers[i].next = idle_workers;
+                               idle_workers = i;
+                       }
                        MT_sema_destroy(&workers[i].s);
                }
                MT_lock_unset(&dataflowLock);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to