Changeset: 4c47797192f4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4c47797192f4
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:

Add multiple worker pools
To avoid users being blocked in accessing the server,
we switched from a single central pool of workers to
one pool per user. Once we run out of thread slots,
all subsequent processing runs in sequential mode.


diffs (240 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -36,6 +36,7 @@
  * access it mostly without expensive locking.
  */
 #include "mal_dataflow.h"
+#include "mal_client.h"
 
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
@@ -61,7 +62,7 @@ typedef struct queue {
        FlowEvent *data;
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
-} queue;
+} Queue;
 
 /*
  * The dataflow dependency is administered in a graph list structure.
@@ -78,11 +79,13 @@ typedef struct DATAFLOW {
        int *nodes;         /* dependency graph nodes */
        int *edges;         /* dependency graph */
        MT_Lock flowlock;   /* lock to protect the above */
-       queue *done;        /* instructions handled */
+       Queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
+#define MAXQ 1024
 static MT_Id workers[THREADS];
-static queue *todo = 0;        /* pending instructions */
+static int workerqueue[THREADS]; /* maps workers towards the todo queues */
+static Queue *todo[MAXQ];      /* pending instructions organized by user 
MAXTODO > #users */
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -104,10 +107,10 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
  * can be executed in parallel.
  */
 
-static queue*
+static Queue*
 q_create(int sz)
 {
-       queue *q = (queue*)GDKmalloc(sizeof(queue));
+       Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
 
        if (q == NULL)
                return NULL;
@@ -125,7 +128,7 @@ q_create(int sz)
 }
 
 static void
-q_destroy(queue *q)
+q_destroy(Queue *q)
 {
        MT_lock_destroy(&q->l);
        MT_sema_destroy(&q->s);
@@ -136,7 +139,7 @@ q_destroy(queue *q)
 /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
 /* we might actually sort it for better scheduling behavior */
 static void
-q_enqueue_(queue *q, FlowEvent d)
+q_enqueue_(Queue *q, FlowEvent d)
 {
        assert(d);
        if (q->last == q->size) {
@@ -146,7 +149,7 @@ q_enqueue_(queue *q, FlowEvent d)
        q->data[q->last++] = d;
 }
 static void
-q_enqueue(queue *q, FlowEvent d)
+q_enqueue(Queue *q, FlowEvent d)
 {
        MT_lock_set(&q->l, "q_enqueue");
        q_enqueue_(q, d);
@@ -162,7 +165,7 @@ q_enqueue(queue *q, FlowEvent d)
 
 #ifdef USE_MAL_ADMISSION
 static void
-q_requeue_(queue *q, FlowEvent d)
+q_requeue_(Queue *q, FlowEvent d)
 {
        int i;
 
@@ -178,7 +181,7 @@ q_requeue_(queue *q, FlowEvent d)
        q->last++;
 }
 static void
-q_requeue(queue *q, FlowEvent d)
+q_requeue(Queue *q, FlowEvent d)
 {
        assert(d);
        MT_lock_set(&q->l, "q_requeue");
@@ -189,7 +192,7 @@ q_requeue(queue *q, FlowEvent d)
 #endif
 
 static void *
-q_dequeue(queue *q)
+q_dequeue(Queue *q)
 {
        void *r = NULL;
 
@@ -242,6 +245,7 @@ DFLOWworker(void *t)
        DataFlow flow;
        FlowEvent fe = 0, fnxt = 0;
        int id = (int) ((MT_Id *) t - workers), last = 0;
+       int wq;
        Thread thr;
        str error = 0;
 
@@ -253,8 +257,9 @@ DFLOWworker(void *t)
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
+               wq = workerqueue[id];
                if (fnxt == 0)
-                       fe = q_dequeue(todo);
+                       fe = q_dequeue(todo[wq]);
                else fe = fnxt;
                fnxt = 0;
                assert(fe);
@@ -272,9 +277,9 @@ DFLOWworker(void *t)
 #ifdef USE_MAL_ADMISSION
                        if (MALadmission(fe->argclaim, fe->hotclaim)) {
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               if (todo->last == 0)
+                               if (todo[wq]->last == 0)
                                        MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo, fe);
+                               q_requeue(todo[wq], fe);
                                continue;
                        }
 #endif
@@ -330,7 +335,7 @@ DFLOWworker(void *t)
 
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
-                       if (todo->last == 0)
+                       if (todo[wq]->last == 0)
                                profilerHeartbeatEvent("wait");
                        else
                                MALresourceFairness(NULL, NULL, usec);
@@ -338,6 +343,8 @@ DFLOWworker(void *t)
        }
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
+       workerqueue[wq] = 0;
+       workers[wq] = 0;
        THRdel(thr);
 }
 
@@ -349,19 +356,27 @@ DFLOWworker(void *t)
  * The workers are assembled in a local table to enable debugging.
  */
 static void
-DFLOWinitialize(void)
+DFLOWinitialize(int index)
 {
-       int i, limit;
+       int i, worker, limit;
 
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       if (todo) {
+       if (todo[index]) {
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return;
        }
-       todo = q_create(2048);
+       todo[index] = q_create(2048);
        limit = GDKnr_threads ? GDKnr_threads : 1;
-       for (i = 0; i < limit; i++)
-               MT_create_thread(&workers[i], DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE);
+       for (worker = 0; worker < THREADS; worker++)
+               if( workers[worker] == 0)
+                       break;
+       for (i = 0; i < limit; i++){
+               MT_create_thread(&workers[worker], DFLOWworker, (void *) 
&workers[worker], MT_THR_JOINABLE);
+               workerqueue[worker] = index;
+               for (; worker < THREADS; worker++)
+                       if( workers[worker] == 0)
+                               break;
+       }
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
 }
  
@@ -501,6 +516,7 @@ DFLOWscheduler(DataFlow flow)
        int tasks=0, actions = flow->stop - flow->start;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
+       int wq;
 
        if (actions == 0)
                throw(MAL, "dataflow", "Empty dataflow block");
@@ -511,6 +527,7 @@ DFLOWscheduler(DataFlow flow)
                fe[0].flow->cntxt->timer = GDKusec();
 
        MT_lock_set(&flow->flowlock, "MALworker");
+       wq = flow->cntxt->idx;
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
@@ -518,7 +535,7 @@ DFLOWscheduler(DataFlow flow)
                        for (j = p->retc; j < p->argc; j++)
                                fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, fe[i].pc, j, FALSE);
 #endif
-                       q_enqueue(todo, flow->status + i);
+                       q_enqueue(todo[wq], flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
@@ -543,7 +560,7 @@ DFLOWscheduler(DataFlow flow)
                                if (flow->status[i].blocks == 1 ) {
                                        flow->status[i].state = DFLOWrunning;
                                        flow->status[i].blocks--;
-                                       q_enqueue(todo, flow->status + i);
+                                       q_enqueue(todo[wq], flow->status + i);
                                        PARDEBUG
                                        mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                                } else {
@@ -576,14 +593,15 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        /* in debugging mode we should not start multiple threads */
        if (stk->cmd)
                return MAL_SUCCEED;
+       /* too many threads turns dataflow processing off */
+       if ( cntxt->idx > MAXQ)
+               return MAL_SUCCEED;
 
        assert(stoppc > startpc);
 
        /* check existence of workers */
-       if (workers[0] == 0)
-               DFLOWinitialize();
-       assert(workers[0]);
-       assert(todo);
+       if (todo[cntxt->idx] == 0)
+               DFLOWinitialize(cntxt->idx);
 
        flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to