Changeset: 4c47797192f4 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4c47797192f4 Modified Files: monetdb5/mal/mal_dataflow.c Branch: Feb2013 Log Message:
Add multiple worker pools To avoid users being blocked in accessing the server, we switched from a single central pool of workers to one pool per user. Once we run out of thread slots, all subsequent processing runs in sequential mode. diffs (240 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -36,6 +36,7 @@ * access it mostly without expensive locking. */ #include "mal_dataflow.h" +#include "mal_client.h" #define DFLOWpending 0 /* runnable */ #define DFLOWrunning 1 /* currently in progress */ @@ -61,7 +62,7 @@ typedef struct queue { FlowEvent *data; MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ -} queue; +} Queue; /* * The dataflow dependency is administered in a graph list structure. @@ -78,11 +79,13 @@ typedef struct DATAFLOW { int *nodes; /* dependency graph nodes */ int *edges; /* dependency graph */ MT_Lock flowlock; /* lock to protect the above */ - queue *done; /* instructions handled */ + Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec; +#define MAXQ 1024 static MT_Id workers[THREADS]; -static queue *todo = 0; /* pending instructions */ +static int workerqueue[THREADS]; /* maps workers towards the todo queues */ +static Queue *todo[MAXQ]; /* pending instructions organized by user MAXTODO > #users */ /* * Calculate the size of the dataflow dependency graph. @@ -104,10 +107,10 @@ DFLOWgraphSize(MalBlkPtr mb, int start, * can be executed in parallel. */ -static queue* +static Queue* q_create(int sz) { - queue *q = (queue*)GDKmalloc(sizeof(queue)); + Queue *q = (Queue*)GDKmalloc(sizeof(Queue)); if (q == NULL) return NULL; @@ -125,7 +128,7 @@ q_create(int sz) } static void -q_destroy(queue *q) +q_destroy(Queue *q) { MT_lock_destroy(&q->l); MT_sema_destroy(&q->s); @@ -136,7 +139,7 @@ q_destroy(queue *q) /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */ /* we might actually sort it for better scheduling behavior */ static void -q_enqueue_(queue *q, FlowEvent d) +q_enqueue_(Queue *q, FlowEvent d) { assert(d); if (q->last == q->size) { @@ -146,7 +149,7 @@ q_enqueue_(queue *q, FlowEvent d) q->data[q->last++] = d; } static void -q_enqueue(queue *q, FlowEvent d) +q_enqueue(Queue *q, FlowEvent d) { MT_lock_set(&q->l, "q_enqueue"); q_enqueue_(q, d); @@ -162,7 +165,7 @@ q_enqueue(queue *q, FlowEvent d) #ifdef USE_MAL_ADMISSION static void -q_requeue_(queue *q, FlowEvent d) +q_requeue_(Queue *q, FlowEvent d) { int i; @@ -178,7 +181,7 @@ q_requeue_(queue *q, FlowEvent d) q->last++; } static void -q_requeue(queue *q, FlowEvent d) +q_requeue(Queue *q, FlowEvent d) { assert(d); MT_lock_set(&q->l, "q_requeue"); @@ -189,7 +192,7 @@ q_requeue(queue *q, FlowEvent d) #endif static void * -q_dequeue(queue *q) +q_dequeue(Queue *q) { void *r = NULL; @@ -242,6 +245,7 @@ DFLOWworker(void *t) DataFlow flow; FlowEvent fe = 0, fnxt = 0; int id = (int) ((MT_Id *) t - workers), last = 0; + int wq; Thread thr; str error = 0; @@ -253,8 +257,9 @@ DFLOWworker(void *t) GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { + wq = workerqueue[id]; if (fnxt == 0) - fe = q_dequeue(todo); + fe = q_dequeue(todo[wq]); else fe = fnxt; fnxt = 0; assert(fe); @@ -272,9 +277,9 @@ DFLOWworker(void *t) #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - if (todo->last == 0) + if (todo[wq]->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); + q_requeue(todo[wq], fe); continue; } #endif @@ -330,7 +335,7 @@ DFLOWworker(void *t) q_enqueue(flow->done, fe); if ( fnxt == 0) { - if (todo->last == 0) + if (todo[wq]->last == 0) profilerHeartbeatEvent("wait"); else MALresourceFairness(NULL, NULL, usec); @@ -338,6 +343,8 @@ DFLOWworker(void *t) } GDKfree(GDKerrbuf); GDKsetbuf(0); + workerqueue[wq] = 0; + workers[wq] = 0; THRdel(thr); } @@ -349,19 +356,27 @@ DFLOWworker(void *t) * The workers are assembled in a local table to enable debugging. */ static void -DFLOWinitialize(void) +DFLOWinitialize(int index) { - int i, limit; + int i, worker, limit; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo) { + if (todo[index]) { MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); return; } - todo = q_create(2048); + todo[index] = q_create(2048); limit = GDKnr_threads ? GDKnr_threads : 1; - for (i = 0; i < limit; i++) - MT_create_thread(&workers[i], DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE); + for (worker = 0; worker < THREADS; worker++) + if( workers[worker] == 0) + break; + for (i = 0; i < limit; i++){ + MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE); + workerqueue[worker] = index; + for (; worker < THREADS; worker++) + if( workers[worker] == 0) + break; + } MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); } @@ -501,6 +516,7 @@ DFLOWscheduler(DataFlow flow) int tasks=0, actions = flow->stop - flow->start; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; + int wq; if (actions == 0) throw(MAL, "dataflow", "Empty dataflow block"); @@ -511,6 +527,7 @@ DFLOWscheduler(DataFlow flow) fe[0].flow->cntxt->timer = GDKusec(); MT_lock_set(&flow->flowlock, "MALworker"); + wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION @@ -518,7 +535,7 @@ DFLOWscheduler(DataFlow flow) for (j = p->retc; j < p->argc; j++) fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); #endif - q_enqueue(todo, flow->status + i); + q_enqueue(todo[wq], flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -543,7 +560,7 @@ DFLOWscheduler(DataFlow flow) if (flow->status[i].blocks == 1 ) { flow->status[i].state = DFLOWrunning; flow->status[i].blocks--; - q_enqueue(todo, flow->status + i); + q_enqueue(todo[wq], flow->status + i); PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } else { @@ -576,14 +593,15 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* in debugging mode we should not start multiple threads */ if (stk->cmd) return MAL_SUCCEED; + /* too many threads turns dataflow processing off */ + if ( cntxt->idx > MAXQ) + return MAL_SUCCEED; assert(stoppc > startpc); /* check existence of workers */ - if (workers[0] == 0) - DFLOWinitialize(); - assert(workers[0]); - assert(todo); + if (todo[cntxt->idx] == 0) + DFLOWinitialize(cntxt->idx); flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list