Changeset: 937e0ab26eea for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=937e0ab26eea Modified Files: gdk/gdk_utils.c monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Merge with Feb2013 branch. diffs (truncated from 322 to 300 lines): diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c --- a/gdk/gdk_utils.c +++ b/gdk/gdk_utils.c @@ -1018,12 +1018,6 @@ GDKinit(opt *set, int setlen) /* Mserver by default takes 80% of all memory as a default */ GDK_mem_maxsize = GDK_mem_maxsize_max = (size_t) ((double) MT_npages() * (double) MT_pagesize() * 0.815); -#ifdef NATIVE_WIN32 - GDK_mmap_minsize = GDK_mem_maxsize_max; -#else - GDK_mmap_minsize = MIN( 1<<30 , GDK_mem_maxsize_max/6 ); - /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6 ^ */ -#endif GDK_mem_bigsize = 1024*1024; GDKremovedir(DELDIR); BBPinit(); @@ -1066,6 +1060,10 @@ GDKinit(opt *set, int setlen) GDKsetenv(n[i].name, n[i].value); free(n); + GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0); + if (GDKnr_threads == 0) + GDKnr_threads = MT_check_nr_cores(); + if ((p = GDKgetenv("gdk_dbpath")) != NULL && (p = strrchr(p, DIR_SEP)) != NULL) { GDKsetenv("gdk_dbname", p + 1); @@ -1091,6 +1089,13 @@ GDKinit(opt *set, int setlen) } if ((p = GDKgetenv("gdk_mmap_minsize"))) { GDK_mmap_minsize = MAX(REMAP_PAGE_MAXSIZE, (size_t) strtoll(p, NULL, 10)); + } else { +#ifdef NATIVE_WIN32 + GDK_mmap_minsize = GDK_mem_maxsize_max / (GDKnr_threads ? GDKnr_threads : 1); +#else + GDK_mmap_minsize = MIN(1 << 30, (GDK_mem_maxsize_max / 6) / (GDKnr_threads ? GDKnr_threads : 1)); + /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6 */ +#endif } if (GDKgetenv("gdk_mem_pagebits") == NULL) { snprintf(buf, sizeof(buf), "%d", GDK_mem_pagebits); @@ -1105,18 +1110,6 @@ GDKinit(opt *set, int setlen) GDKsetenv("monet_pid", buf); } - GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0); - if (GDKnr_threads == 0) - GDKnr_threads = MT_check_nr_cores(); -#ifdef NATIVE_WIN32 - GDK_mmap_minsize /= (GDKnr_threads ? GDKnr_threads : 1); -#else - /* WARNING: This unconditionally overwrites above settings, */ - /* incl. setting via MonetDB env. var. "gdk_mmap_minsize" ! */ - GDK_mmap_minsize = MIN( 1<<30 , (GDK_mem_maxsize_max/6) / (GDKnr_threads ? GDKnr_threads : 1) ); - /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6 ^ */ -#endif - if ((p = mo_find_option(set, setlen, "gdk_vmtrim")) == NULL || strcasecmp(p, "yes") == 0) MT_create_thread(&GDKvmtrim_id, GDKvmtrim, &GDK_mem_maxsize, diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -88,6 +88,8 @@ typedef struct DATAFLOW { static struct worker { MT_Id id; enum {IDLE, RUNNING, EXITED} flag; + Client cntxt; /* client we do work for (NULL -> any) */ + MT_Sema s; } workers[THREADS]; static Queue *todo = 0; /* pending instructions */ static int volatile exiting = 0; @@ -207,16 +209,33 @@ q_requeue(Queue *q, FlowEvent d) } #endif -static void * -q_dequeue(Queue *q) +static FlowEvent +q_dequeue(Queue *q, Client cntxt) { - void *r = NULL; + FlowEvent r = NULL; assert(q); MT_sema_down(&q->s, "q_dequeue"); if (exiting) return NULL; MT_lock_set(&q->l, "q_dequeue"); + if (cntxt) { + int i; + + for (i = q->last - 1; i >= 0; i--) { + if (q->data[i]->flow->cntxt == cntxt) { + r = q->data[i]; + q->last--; + while (i < q->last) { + q->data[i] = q->data[i + 1]; + i++; + } + break; + } + } + MT_lock_unset(&q->l, "q_dequeue"); + return r; + } if (q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l, "q_dequeue"); @@ -228,7 +247,7 @@ q_dequeue(Queue *q) assert(q->last > 0); if (q->last > 0) { /* LIFO favors garbage collection */ - r = (void*) q->data[--q->last]; + r = q->data[--q->last]; q->data[q->last] = 0; } /* else: terminating */ @@ -281,10 +300,28 @@ DFLOWworker(void *T) GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; + if (t->cntxt) { + /* wait until we are allowed to start working */ + MT_sema_down(&t->s, "DFLOWworker"); + } while (1) { if (fnxt == 0) { - if ((fe = q_dequeue(todo)) == NULL) - break;; + Client cntxt = t->cntxt; + fe = q_dequeue(todo, cntxt); + if (fe == NULL) { + if (cntxt) { + /* we're not done yet with work for the current + * client (as far as we know), so give up the CPU + * and let the scheduler enter some more work, but + * first compensate for the down we did in + * dequeue */ + MT_sema_up(&todo->s, "DFLOWworker"); + MT_sleep_ms(1); + continue; + } + /* no more work to be done: exit */ + break; + } } else fe = fnxt; if (exiting) { @@ -399,9 +436,12 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return -1; } - limit = GDKnr_threads ? GDKnr_threads : 1; + for (i = 0; i < THREADS; i++) + MT_sema_init(&workers[i].s, 0, "DFLOWinitialize"); + limit = GDKnr_threads ? GDKnr_threads - 1 : 0; for (i = 0; i < limit; i++) { workers[i].flag = RUNNING; + workers[i].cntxt = NULL; if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) workers[i].flag = IDLE; else @@ -563,7 +603,7 @@ static void showFlowEvent(DataFlow flow, */ static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, struct worker *w) { int last; int i; @@ -600,11 +640,12 @@ DFLOWscheduler(DataFlow flow) PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } MT_lock_unset(&flow->flowlock, "DFLOWscheduler"); + MT_sema_up(&w->s, "DFLOWscheduler"); PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n", actions); while (actions != tasks ) { - f = q_dequeue(flow->done); + f = q_dequeue(flow->done, NULL); if (exiting) break; if (f == NULL) @@ -632,6 +673,9 @@ DFLOWscheduler(DataFlow flow) } MT_lock_unset(&flow->flowlock, "DFLOWscheduler"); } + /* release the worker from its specific task (turn it into a + * generic worker) */ + w->cntxt = NULL; /* wrap up errors */ assert(flow->done->last == 0); if (flow->error ) { @@ -641,6 +685,20 @@ DFLOWscheduler(DataFlow flow) return ret; } +/* We create a pool of GDKnr_threads-1 generic workers, that is, + * workers that will take on jobs from any clients. In addition, we + * create a single specific worker per client (i.e. each time we enter + * here). This specific worker will only do work for the client for + * which it was started. In this way we can guarantee that there will + * always be progress for the client, even if all other workers are + * doing something big. + * + * When all jobs for a client have been done (there are no more + * entries for the client in the queue), the specific worker turns + * itself into a generic worker. At the same time, we signal that one + * generic worker should exit and this function returns. In this way + * we make sure that there are once again GDKnr_threads-1 generic + * workers. */ str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk) { @@ -670,44 +728,64 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* check existence of workers */ if (todo == NULL) { /* create thread pool */ - if (DFLOWinitialize() < 0) { + if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) { /* no threads created, run serially */ *ret = TRUE; return MAL_SUCCEED; } i = THREADS; /* we didn't create an extra thread */ - } else - if( stk->calldepth){ - /* create one more worker to compensate for our waiting until - * all work is done, provided we are about to perform a recursive call */ - MT_lock_set(&mal_contextLock, "runMALdataflow"); - for (i = 0; i < THREADS && todo->exitedcount > 0; i++) { - if (workers[i].flag == EXITED) { - todo->exitedcount--; - workers[i].flag = IDLE; - MT_join_thread(workers[i].id); + } + assert(todo); + /* in addition, create one more worker that will only execute + * tasks for the current client to compensate for our waiting + * until all work is done */ + MT_lock_set(&mal_contextLock, "runMALdataflow"); + /* join with already exited threads */ + for (i = 0; i < THREADS && todo->exitedcount > 0; i++) { + if (workers[i].flag == EXITED) { + todo->exitedcount--; + workers[i].flag = IDLE; + workers[i].cntxt = NULL; + MT_join_thread(workers[i].id); + } + } + for (i = 0; i < THREADS; i++) { + if (workers[i].flag == IDLE) { + /* only create specific worker if we are not doing a + * recursive call */ + if (stk->calldepth > 1) { + int j; + MT_Id pid = MT_getpid(); + + /* doing a recursive call: copy specificity from + * current worker to new worker */ + workers[i].cntxt = NULL; + for (j = 0; j < THREADS; j++) { + if (workers[j].flag == RUNNING && workers[j].id == pid) { + workers[i].cntxt = workers[j].cntxt; + break; + } + } + } else { + /* not doing a recursive call: create specific worker */ + workers[i].cntxt = cntxt; } + if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { + /* cannot start new thread, run serially */ + *ret = TRUE; + MT_lock_unset(&mal_contextLock, "runMALdataflow"); + return MAL_SUCCEED; + } + workers[i].flag = RUNNING; + break; } - for (i = 0; i < THREADS; i++) { - if (workers[i].flag == IDLE) { - if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) { - /* cannot start new thread, run serially */ - *ret = TRUE; - MT_lock_unset(&mal_contextLock, "runMALdataflow"); - return MAL_SUCCEED; - } - workers[i].flag = RUNNING; - break; - } - } - MT_lock_unset(&mal_contextLock, "runMALdataflow"); - if (i == THREADS) { - /* no empty threads slots found, run serially */ - *ret = TRUE; - return MAL_SUCCEED; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list