Changeset: a626b67c7c58 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/a626b67c7c58
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Keep number of spare threads in check.
Use --set dataflow_max_free=N to set maximum number of free threads
(default gdk_nr_threads).


diffs (86 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,7 +80,7 @@ typedef struct DATAFLOW {
 
 static struct worker {
        MT_Id id;
-       enum {IDLE, WAITING, RUNNING, FREE } flag;
+       enum {IDLE, WAITING, RUNNING, FREE, EXITED } flag;
        ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
        char *errbuf;              /* GDKerrbuf so that we can allocate before 
fork */
        MT_Sema s;
@@ -89,8 +89,11 @@ static struct worker {
 } workers[THREADS];
 /* heads of two mutually exclusive linked lists, both using the .next
  * field in the worker struct */
+static int exited_workers = -1;        /* to be joined threads */
 static int idle_workers = -1;  /* idle workers (no thread associated) */
 static int free_workers = -1;  /* free workers (thread doing nothing) */
+static int free_count = 0;             /* number of free threads */
+static int free_max = 0;               /* max number of spare free threads */
 
 static Queue *todo = 0;        /* pending instructions */
 
@@ -104,6 +107,7 @@ mal_dataflow_reset(void)
        stopMALdataflow();
        memset((char*) workers, 0,  sizeof(workers));
        idle_workers = -1;
+       exited_workers = -1;
        if( todo) {
                GDKfree(todo->data);
                MT_lock_destroy(&todo->l);
@@ -449,6 +453,14 @@ DFLOWworker(void *T)
                        MT_lock_unset(&dataflowLock);
                        break;
                }
+               if (free_count >= free_max) {
+                       t->flag = EXITED;
+                       t->next = exited_workers;
+                       exited_workers = t->self;
+                       MT_lock_unset(&dataflowLock);
+                       break;
+               }
+               free_count++;
                t->flag = FREE;
                assert(free_workers != t->self);
                t->next = free_workers;
@@ -459,7 +471,6 @@ DFLOWworker(void *T)
                        break;
                assert(t->flag == WAITING);
        }
-
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
 }
@@ -486,6 +497,7 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock);
                return 0;
        }
+       free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : 
GDKnr_threads);
        todo = q_create(2048, "todo");
        if (todo == NULL) {
                MT_lock_unset(&dataflowLock);
@@ -806,9 +818,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m
         * tasks for the current client to compensate for our waiting
         * until all work is done */
        MT_lock_set(&dataflowLock);
+       /* join with already exited threads */
+       while ((i = exited_workers) >= 0) {
+               assert(workers[i].flag == EXITED);
+               exited_workers = workers[i].next;
+               workers[i].flag = IDLE;
+               MT_lock_unset(&dataflowLock);
+               MT_join_thread(workers[i].id);
+               MT_lock_set(&dataflowLock);
+               workers[i].next = idle_workers;
+               idle_workers = i;
+       }
        assert(cntxt != NULL);
        if ((i = free_workers) >= 0) {
                assert(workers[i].flag == FREE);
+               assert(free_count > 0);
+               free_count--;
                free_workers = workers[i].next;
                workers[i].flag = WAITING;
                ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to