Changeset: 937e0ab26eea for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=937e0ab26eea
Modified Files:
        gdk/gdk_utils.c
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Merge with Feb2013 branch.


diffs (truncated from 322 to 300 lines):

diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -1018,12 +1018,6 @@ GDKinit(opt *set, int setlen)
 
        /* Mserver by default takes 80% of all memory as a default */
        GDK_mem_maxsize = GDK_mem_maxsize_max = (size_t) ((double) MT_npages() 
* (double) MT_pagesize() * 0.815);
-#ifdef NATIVE_WIN32
-       GDK_mmap_minsize = GDK_mem_maxsize_max;
-#else
-       GDK_mmap_minsize = MIN( 1<<30 , GDK_mem_maxsize_max/6 );
-       /*   per op:  2 args + 1 res, each with head & tail  =>  (2+1)*2 = 6  ^ 
*/
-#endif
        GDK_mem_bigsize = 1024*1024;
        GDKremovedir(DELDIR);
        BBPinit();
@@ -1066,6 +1060,10 @@ GDKinit(opt *set, int setlen)
                GDKsetenv(n[i].name, n[i].value);
        free(n);
 
+       GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0);
+       if (GDKnr_threads == 0)
+               GDKnr_threads = MT_check_nr_cores();
+
        if ((p = GDKgetenv("gdk_dbpath")) != NULL &&
            (p = strrchr(p, DIR_SEP)) != NULL) {
                GDKsetenv("gdk_dbname", p + 1);
@@ -1091,6 +1089,13 @@ GDKinit(opt *set, int setlen)
        }
        if ((p = GDKgetenv("gdk_mmap_minsize"))) {
                GDK_mmap_minsize = MAX(REMAP_PAGE_MAXSIZE, (size_t) strtoll(p, 
NULL, 10));
+       } else {
+#ifdef NATIVE_WIN32
+               GDK_mmap_minsize = GDK_mem_maxsize_max / (GDKnr_threads ? 
GDKnr_threads : 1);
+#else
+               GDK_mmap_minsize = MIN(1 << 30, (GDK_mem_maxsize_max / 6) / 
(GDKnr_threads ? GDKnr_threads : 1));
+               /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6 
*/
+#endif
        }
        if (GDKgetenv("gdk_mem_pagebits") == NULL) {
                snprintf(buf, sizeof(buf), "%d", GDK_mem_pagebits);
@@ -1105,18 +1110,6 @@ GDKinit(opt *set, int setlen)
                GDKsetenv("monet_pid", buf);
        }
 
-       GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0);
-       if (GDKnr_threads == 0)
-               GDKnr_threads = MT_check_nr_cores();
-#ifdef NATIVE_WIN32
-       GDK_mmap_minsize /= (GDKnr_threads ? GDKnr_threads : 1);
-#else
-       /* WARNING: This unconditionally overwrites above settings, */
-       /* incl. setting via MonetDB env. var. "gdk_mmap_minsize" ! */
-       GDK_mmap_minsize = MIN( 1<<30 , (GDK_mem_maxsize_max/6) / 
(GDKnr_threads ? GDKnr_threads : 1) );
-       /*    per op:  2 args + 1 res, each with head & tail  =>  (2+1)*2 = 6  
^ */
-#endif
-
        if ((p = mo_find_option(set, setlen, "gdk_vmtrim")) == NULL ||
            strcasecmp(p, "yes") == 0)
                MT_create_thread(&GDKvmtrim_id, GDKvmtrim, &GDK_mem_maxsize,
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -88,6 +88,8 @@ typedef struct DATAFLOW {
 static struct worker {
        MT_Id id;
        enum {IDLE, RUNNING, EXITED} flag;
+       Client cntxt;                           /* client we do work for (NULL 
-> any) */
+       MT_Sema s;
 } workers[THREADS];
 static Queue *todo = 0;        /* pending instructions */
 static int volatile exiting = 0;
@@ -207,16 +209,33 @@ q_requeue(Queue *q, FlowEvent d)
 }
 #endif
 
-static void *
-q_dequeue(Queue *q)
+static FlowEvent
+q_dequeue(Queue *q, Client cntxt)
 {
-       void *r = NULL;
+       FlowEvent r = NULL;
 
        assert(q);
        MT_sema_down(&q->s, "q_dequeue");
        if (exiting)
                return NULL;
        MT_lock_set(&q->l, "q_dequeue");
+       if (cntxt) {
+               int i;
+
+               for (i = q->last - 1; i >= 0; i--) {
+                       if (q->data[i]->flow->cntxt == cntxt) {
+                               r = q->data[i];
+                               q->last--;
+                               while (i < q->last) {
+                                       q->data[i] = q->data[i + 1];
+                                       i++;
+                               }
+                               break;
+                       }
+               }
+               MT_lock_unset(&q->l, "q_dequeue");
+               return r;
+       }
        if (q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l, "q_dequeue");
@@ -228,7 +247,7 @@ q_dequeue(Queue *q)
        assert(q->last > 0);
        if (q->last > 0) {
                /* LIFO favors garbage collection */
-               r = (void*) q->data[--q->last];
+               r = q->data[--q->last];
                q->data[q->last] = 0;
        }
        /* else: terminating */
@@ -281,10 +300,28 @@ DFLOWworker(void *T)
 
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
+       if (t->cntxt) {
+               /* wait until we are allowed to start working */
+               MT_sema_down(&t->s, "DFLOWworker");
+       }
        while (1) {
                if (fnxt == 0) {
-                       if ((fe = q_dequeue(todo)) == NULL)
-                               break;;
+                       Client cntxt = t->cntxt;
+                       fe = q_dequeue(todo, cntxt);
+                       if (fe == NULL) {
+                               if (cntxt) {
+                                       /* we're not done yet with work for the 
current
+                                        * client (as far as we know), so give 
up the CPU
+                                        * and let the scheduler enter some 
more work, but
+                                        * first compensate for the down we did 
in
+                                        * dequeue */
+                                       MT_sema_up(&todo->s, "DFLOWworker");
+                                       MT_sleep_ms(1);
+                                       continue;
+                               }
+                               /* no more work to be done: exit */
+                               break;
+                       }
                } else
                        fe = fnxt;
                if (exiting) {
@@ -399,9 +436,12 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return -1;
        }
-       limit = GDKnr_threads ? GDKnr_threads : 1;
+       for (i = 0; i < THREADS; i++)
+               MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
+       limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
        for (i = 0; i < limit; i++) {
                workers[i].flag = RUNNING;
+               workers[i].cntxt = NULL;
                if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE) < 0)
                        workers[i].flag = IDLE;
                else
@@ -563,7 +603,7 @@ static void showFlowEvent(DataFlow flow,
 */
 
 static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, struct worker *w)
 {
        int last;
        int i;
@@ -600,11 +640,12 @@ DFLOWscheduler(DataFlow flow)
                        PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT 
"\n", flow->status[i].pc, flow->status[i].argclaim);
                }
        MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
+       MT_sema_up(&w->s, "DFLOWscheduler");
 
        PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n", 
actions);
 
        while (actions != tasks ) {
-               f = q_dequeue(flow->done);
+               f = q_dequeue(flow->done, NULL);
                if (exiting)
                        break;
                if (f == NULL)
@@ -632,6 +673,9 @@ DFLOWscheduler(DataFlow flow)
                        }
                MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
        }
+       /* release the worker from its specific task (turn it into a
+        * generic worker) */
+       w->cntxt = NULL;
        /* wrap up errors */
        assert(flow->done->last == 0);
        if (flow->error ) {
@@ -641,6 +685,20 @@ DFLOWscheduler(DataFlow flow)
        return ret;
 }
 
+/* We create a pool of GDKnr_threads-1 generic workers, that is,
+ * workers that will take on jobs from any clients.  In addition, we
+ * create a single specific worker per client (i.e. each time we enter
+ * here).  This specific worker will only do work for the client for
+ * which it was started.  In this way we can guarantee that there will
+ * always be progress for the client, even if all other workers are
+ * doing something big.
+ *
+ * When all jobs for a client have been done (there are no more
+ * entries for the client in the queue), the specific worker turns
+ * itself into a generic worker.  At the same time, we signal that one
+ * generic worker should exit and this function returns.  In this way
+ * we make sure that there are once again GDKnr_threads-1 generic
+ * workers. */
 str
 runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr 
stk)
 {
@@ -670,44 +728,64 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        /* check existence of workers */
        if (todo == NULL) {
                /* create thread pool */
-               if (DFLOWinitialize() < 0) {
+               if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
                        /* no threads created, run serially */
                        *ret = TRUE;
                        return MAL_SUCCEED;
                }
                i = THREADS;                    /* we didn't create an extra 
thread */
-       } else 
-       if( stk->calldepth){
-               /* create one more worker to compensate for our waiting until
-                * all work is done, provided we are about to perform a 
recursive call */
-               MT_lock_set(&mal_contextLock, "runMALdataflow");
-               for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
-                       if (workers[i].flag == EXITED) {
-                               todo->exitedcount--;
-                               workers[i].flag = IDLE;
-                               MT_join_thread(workers[i].id);
+       }
+       assert(todo);
+       /* in addition, create one more worker that will only execute
+        * tasks for the current client to compensate for our waiting
+        * until all work is done */
+       MT_lock_set(&mal_contextLock, "runMALdataflow");
+       /* join with already exited threads */
+       for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
+               if (workers[i].flag == EXITED) {
+                       todo->exitedcount--;
+                       workers[i].flag = IDLE;
+                       workers[i].cntxt = NULL;
+                       MT_join_thread(workers[i].id);
+               }
+       }
+       for (i = 0; i < THREADS; i++) {
+               if (workers[i].flag == IDLE) {
+                       /* only create specific worker if we are not doing a
+                        * recursive call */
+                       if (stk->calldepth > 1) {
+                               int j;
+                               MT_Id pid = MT_getpid();
+
+                               /* doing a recursive call: copy specificity from
+                                * current worker to new worker */
+                               workers[i].cntxt = NULL;
+                               for (j = 0; j < THREADS; j++) {
+                                       if (workers[j].flag == RUNNING && 
workers[j].id == pid) {
+                                               workers[i].cntxt = 
workers[j].cntxt;
+                                               break;
+                                       }
+                               }
+                       } else {
+                               /* not doing a recursive call: create specific 
worker */
+                               workers[i].cntxt = cntxt;
                        }
+                       if (MT_create_thread(&workers[i].id, DFLOWworker, (void 
*) &workers[i], MT_THR_JOINABLE) < 0) {
+                               /* cannot start new thread, run serially */
+                               *ret = TRUE;
+                               MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
+                               return MAL_SUCCEED;
+                       }
+                       workers[i].flag = RUNNING;
+                       break;
                }
-               for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag == IDLE) {
-                               if (MT_create_thread(&workers[i].id, 
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
-                                       /* cannot start new thread, run 
serially */
-                                       *ret = TRUE;
-                                       MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
-                                       return MAL_SUCCEED;
-                               }
-                               workers[i].flag = RUNNING;
-                               break;
-                       }
-               }
-               MT_lock_unset(&mal_contextLock, "runMALdataflow");
-               if (i == THREADS) {
-                       /* no empty threads slots found, run serially */
-                       *ret = TRUE;
-                       return MAL_SUCCEED;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to