Changeset: 3669ddd28bf0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3669ddd28bf0
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:

New thread group per dataflow block
The centralized worker thread group could lead to an unacceptable
situation. If a user is heavily processing complex queries, then
no other user could even log into the system, for its MAL statements
ended up at the end of the shared queues.

The problem has been resolved by introducing a thread group per dataflow
block. This may lead to a large number of processes, whose resources
are managed by the OS.

It solves bug 3258


diffs (235 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -78,12 +78,12 @@ typedef struct DATAFLOW {
        int *nodes;         /* dependency graph nodes */
        int *edges;         /* dependency graph */
        MT_Lock flowlock;   /* lock to protect the above */
+       queue *todo;            /* pending instructions */
        queue *done;        /* instructions handled */
+       int threads;            /* worker threads active */
+       MT_Id workers[THREADS];
 } *DataFlow, DataFlowRec;
 
-static MT_Id workers[THREADS];
-static queue *todo = 0;        /* pending instructions */
-
 /*
  * Calculate the size of the dataflow dependency graph.
  */
@@ -138,7 +138,6 @@ q_destroy(queue *q)
 static void
 q_enqueue_(queue *q, FlowEvent d)
 {
-       assert(d);
        if (q->last == q->size) {
                q->size <<= 1;
                q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
@@ -214,7 +213,6 @@ q_dequeue(queue *q)
         */
 
        MT_lock_unset(&q->l, "q_dequeue");
-       assert(r);
        return r;
 }
 
@@ -239,14 +237,15 @@ q_dequeue(queue *q)
 static void
 DFLOWworker(void *t)
 {
-       DataFlow flow;
+       DataFlow flow = (DataFlow) t;
        FlowEvent fe = 0, fnxt = 0;
-       int id = (int) ((MT_Id *) t - workers), last = 0;
+       MT_Id id = MT_getpid();
+       int last = 0;
        Thread thr;
        str error = 0;
 
        int i;
-       lng usec = 0;
+       //lng usec = 0;
 
        thr = THRnew("DFLOWworker");
 
@@ -254,8 +253,10 @@ DFLOWworker(void *t)
        GDKerrbuf[0] = 0;
        while (1) {
                if (fnxt == 0)
-                       fe = q_dequeue(todo);
+                       fe = q_dequeue(flow->todo);
                else fe = fnxt;
+               if ( fe == 0)
+                       break;
                fnxt = 0;
                assert(fe);
                flow = fe->flow;
@@ -266,20 +267,20 @@ DFLOWworker(void *t)
                        continue;
                }
 
-               usec = GDKusec();
+               //usec = GDKusec();
                /* skip all instructions when we have encontered an error */
                if (flow->error == 0) {
 #ifdef USE_MAL_ADMISSION
                        if (MALadmission(fe->argclaim, fe->hotclaim)) {
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               if (todo->last == 0)
+                               if (flow->todo->last == 0)
                                        MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo, fe);
+                               q_requeue(flow->todo, fe);
                                continue;
                        }
 #endif
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
-                       PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
+                       PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
"SZFMT" claim= " LLFMT "," LLFMT " %s\n",
                                                                  fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
                        /* release the memory claim */
@@ -330,12 +331,15 @@ DFLOWworker(void *t)
 
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
-                       if (todo->last == 0)
+                       if (flow->todo->last == 0)
                                profilerHeartbeatEvent("wait");
-                       else
-                               MALresourceFairness(NULL, NULL, usec);
+                       //else
+                               //MALresourceFairness(NULL, NULL, usec);
                }
        }
+       for( i = 0; i< flow->threads; i++)
+       if ( flow->workers[i] == id)
+               flow->workers[i] = 0;
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
        THRdel(thr);
@@ -349,22 +353,51 @@ DFLOWworker(void *t)
  * The workers are assembled in a local table to enable debugging.
  */
 static void
-DFLOWinitialize(void)
+DFLOWinitialize(DataFlow flow, int size)
 {
-       int i, limit;
+       int i;
 
-       MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       if (todo) {
-               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               return;
+       MT_lock_init(&flow->flowlock, "DFLOWworker");
+       flow->todo = q_create(size);
+       flow->done = q_create(size);
+       flow->threads = GDKnr_threads ? GDKnr_threads :1;
+       for (i = 0; i < flow->threads; i++){
+               MT_create_thread(&flow->workers[i], DFLOWworker, (void *) flow, 
MT_THR_JOINABLE);
+               /* upon failure of starting threads we reduce the count */
+               if ( flow->workers[i]== 0){
+                       flow->threads --;
+                       i--;
+               }
        }
-       todo = q_create(2048);
-       limit = GDKnr_threads ? GDKnr_threads : 1;
-       for (i = 0; i < limit; i++)
-               MT_create_thread(&workers[i], DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE);
-       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
 }
  
+static str
+DFLOWfinalize(DataFlow flow)
+{
+       int i, cnt= flow->threads, runs =0;
+
+       for( i = 0; i< cnt; i++)
+               q_enqueue(flow->todo, 0);
+       /* time out when threads are already killed */
+       do{
+               runs++;
+               cnt = 0;
+               MT_sleep_ms(1);
+               for( i = 0; i < flow->threads; i++)
+                       cnt += flow->workers[i] ==0;
+       } while( cnt != flow->threads && runs <5000);
+
+       if ( runs == 5000)
+               throw(MAL,"dataflow","Timeout on thread termination");
+       GDKfree(flow->status);
+       GDKfree(flow->edges);
+       GDKfree(flow->nodes);
+       q_destroy(flow->done);
+       q_destroy(flow->todo);
+       MT_lock_destroy(&flow->flowlock);
+       GDKfree(flow);
+       return MAL_SUCCEED;
+}
 /*
  * The dataflow administration is based on administration of
  * how many variables are still missing before it can be executed.
@@ -518,7 +551,7 @@ DFLOWscheduler(DataFlow flow)
                        for (j = p->retc; j < p->argc; j++)
                                fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, fe[i].pc, j, FALSE);
 #endif
-                       q_enqueue(todo, flow->status + i);
+                       q_enqueue(flow->todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
@@ -543,7 +576,7 @@ DFLOWscheduler(DataFlow flow)
                                if (flow->status[i].blocks == 1 ) {
                                        flow->status[i].state = DFLOWrunning;
                                        flow->status[i].blocks--;
-                                       q_enqueue(todo, flow->status + i);
+                                       q_enqueue(flow->todo, flow->status + i);
                                        PARDEBUG
                                        mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                                } else {
@@ -579,13 +612,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m
 
        assert(stoppc > startpc);
 
-       /* check existence of workers */
-       if (workers[0] == 0)
-               DFLOWinitialize();
-       assert(workers[0]);
-       assert(todo);
-
        flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
+       
+       DFLOWinitialize(flow, stoppc- startpc +1);
+       assert(flow->todo);
+       assert(flow->done);
 
        flow->cntxt = cntxt;
        flow->mb = mb;
@@ -596,9 +627,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        flow->start = startpc + 1;
        flow->stop = stoppc;
 
-       MT_lock_init(&flow->flowlock, "DFLOWworker");
-       flow->done = q_create(stoppc- startpc+1);
-
        flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * 
sizeof(FlowEventRec));
        size = DFLOWgraphSize(mb, startpc, stoppc);
        size += stoppc - startpc;
@@ -608,11 +636,9 @@ runMALdataflow(Client cntxt, MalBlkPtr m
 
        ret = DFLOWscheduler(flow);
 
-       GDKfree(flow->status);
-       GDKfree(flow->edges);
-       GDKfree(flow->nodes);
-       q_destroy(flow->done);
-       MT_lock_destroy(&flow->flowlock);
-       GDKfree(flow);
+       if( ret == MAL_SUCCEED)
+               ret = DFLOWfinalize(flow);
+       else (void)
+               DFLOWfinalize(flow);
        return ret;
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to