Changeset: a626b67c7c58 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/a626b67c7c58 Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Keep number of spare threads in check. Use --set dataflow_max_free=N to set maximum number of free threads (default gdk_nr_threads). diffs (86 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -80,7 +80,7 @@ typedef struct DATAFLOW { static struct worker { MT_Id id; - enum {IDLE, WAITING, RUNNING, FREE } flag; + enum {IDLE, WAITING, RUNNING, FREE, EXITED } flag; ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */ char *errbuf; /* GDKerrbuf so that we can allocate before fork */ MT_Sema s; @@ -89,8 +89,11 @@ static struct worker { } workers[THREADS]; /* heads of two mutually exclusive linked lists, both using the .next * field in the worker struct */ +static int exited_workers = -1; /* to be joined threads */ static int idle_workers = -1; /* idle workers (no thread associated) */ static int free_workers = -1; /* free workers (thread doing nothing) */ +static int free_count = 0; /* number of free threads */ +static int free_max = 0; /* max number of spare free threads */ static Queue *todo = 0; /* pending instructions */ @@ -104,6 +107,7 @@ mal_dataflow_reset(void) stopMALdataflow(); memset((char*) workers, 0, sizeof(workers)); idle_workers = -1; + exited_workers = -1; if( todo) { GDKfree(todo->data); MT_lock_destroy(&todo->l); @@ -449,6 +453,14 @@ DFLOWworker(void *T) MT_lock_unset(&dataflowLock); break; } + if (free_count >= free_max) { + t->flag = EXITED; + t->next = exited_workers; + exited_workers = t->self; + MT_lock_unset(&dataflowLock); + break; + } + free_count++; t->flag = FREE; assert(free_workers != t->self); t->next = free_workers; @@ -459,7 +471,6 @@ DFLOWworker(void *T) break; assert(t->flag == WAITING); } - GDKfree(GDKerrbuf); GDKsetbuf(0); } @@ -486,6 +497,7 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock); return 0; } + free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : GDKnr_threads); todo = q_create(2048, "todo"); if (todo == NULL) { MT_lock_unset(&dataflowLock); @@ -806,9 +818,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m * tasks for the current client to compensate for our waiting * until all work is done */ MT_lock_set(&dataflowLock); + /* join with already exited threads */ + while ((i = exited_workers) >= 0) { + assert(workers[i].flag == EXITED); + exited_workers = workers[i].next; + workers[i].flag = IDLE; + MT_lock_unset(&dataflowLock); + MT_join_thread(workers[i].id); + MT_lock_set(&dataflowLock); + workers[i].next = idle_workers; + idle_workers = i; + } assert(cntxt != NULL); if ((i = free_workers) >= 0) { assert(workers[i].flag == FREE); + assert(free_count > 0); + free_count--; free_workers = workers[i].next; workers[i].flag = WAITING; ATOMIC_PTR_SET(&workers[i].cntxt, cntxt); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list