Changeset: c1f9638ec860 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c1f9638ec860
Added Files:
        sql/test/BugTracker-2013/Tests/nestedcalls.sql
        sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
        sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/modules/mal/language.c
        monetdb5/modules/mal/language.h
        sql/test/BugTracker-2013/Tests/All
Branch: SciQL-2
Log Message:

Merged with Feb2013 branch.


diffs (truncated from 660 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -35,6 +35,7 @@
  * The flow graphs should be organized such that parallel threads can
  * access it mostly without expensive locking.
  */
+#include "monetdb_config.h"
 #include "mal_dataflow.h"
 #include "mal_client.h"
 
@@ -82,10 +83,10 @@ typedef struct DATAFLOW {
        Queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
-#define MAXQ 1024
-static MT_Id workers[THREADS] = {0};
-static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues 
*/
-static Queue *todo[MAXQ] = {0};        /* pending instructions organized by 
user MAXTODO > #users */
+#define MAXQ 256
+static Queue *todos[MAXQ] = {0};       /* pending instructions organized by 
dataflow block */
+static bit occupied[MAXQ]={0};                 /* worker pool is in use? */
+static int volatile exiting = 0;
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
  */
 
 static Queue*
-q_create(int sz)
+q_create(int sz, const char *name)
 {
-       const char* name = "q_create";
        Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
 
        if (q == NULL)
@@ -208,6 +208,8 @@ q_dequeue(Queue *q)
 
        assert(q);
        MT_sema_down(&q->s, "q_dequeue");
+       if (exiting)
+               return NULL;
        MT_lock_set(&q->l, "q_dequeue");
        assert(q->last > 0);
        if (q->last > 0) {
@@ -255,25 +257,23 @@ DFLOWworker(void *t)
 {
        DataFlow flow;
        FlowEvent fe = 0, fnxt = 0;
-       int id = (int) ((MT_Id *) t - workers), last = 0;
-       int wq;
        Thread thr;
        str error = 0;
-
-       int i;
-       lng usec = 0;
+       Queue *todo = *(Queue **) t;
+       int i,last;
 
        thr = THRnew("DFLOWworker");
 
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
-               assert(workerqueue[id] > 0);
-               wq = workerqueue[id] - 1;
                if (fnxt == 0)
-                       fe = q_dequeue(todo[wq]);
+                       fe = q_dequeue(todo);
                else
                        fe = fnxt;
+               if (exiting) {
+                       break;
+               }
                fnxt = 0;
                assert(fe);
                flow = fe->flow;
@@ -285,22 +285,20 @@ DFLOWworker(void *t)
                        continue;
                }
 
-               usec = GDKusec();
                /* skip all instructions when we have encontered an error */
                if (flow->error == 0) {
 #ifdef USE_MAL_ADMISSION
                        if (MALadmission(fe->argclaim, fe->hotclaim)) {
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               assert(todo[wq]);
-                               if (todo[wq]->last == 0)
+                               if (todo->last == 0)
                                        MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo[wq], fe);
+                               q_requeue(todo, fe);
                                continue;
                        }
 #endif
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
                        PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
-                                                                 fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
+                                                                 fe->pc, 
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
                        /* release the memory claim */
                        MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -331,8 +329,8 @@ DFLOWworker(void *t)
                InstrPtr p = getInstrPtr(flow->mb, fe->pc);
                assert(p);
                fe->hotclaim = 0;
-               for (i = 0; i < p->retc; i++)
-                       fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
+               //for (i = 0; i < p->retc; i++)
+                       //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
p, i, FALSE);
                }
 #endif
                MT_lock_set(&flow->flowlock, "MALworker");
@@ -351,56 +349,64 @@ DFLOWworker(void *t)
 
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
-                       assert(todo[wq]);
-                       if (todo[wq]->last == 0)
+                       if (todo->last == 0)
                                profilerHeartbeatEvent("wait");
-                       else
-                               MALresourceFairness(NULL, NULL, usec);
                }
        }
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
-       workerqueue[wq] = 0;
-       workers[wq] = 0;
        THRdel(thr);
 }
 
 /* 
- * Create a set of DFLOW interpreters.
+ * Create an interpreter pool.
  * One worker will adaptively be available for each client.
  * The remainder are taken from the GDKnr_threads argument and
- * typically is equal to the number of cores
+ * typically is equal to the number of cores.
+ * A recursive MAL function call would make for one worker less,
+ * which limits the number of cores for parallel processing.
  * The workers are assembled in a local table to enable debugging.
+ *
+ * BEWARE, failure to create a new worker thread is not an error
+ * but would lead to serial execution.
  */
-static void
-DFLOWinitialize(int index)
+static int
+DFLOWinitialize(void)
 {
-       int i, worker, limit;
+       int i, threads, grp;
+       MT_Id worker;
 
-       assert(index >= 0);
-       assert(index < THREADS);
+       threads = GDKnr_threads ? GDKnr_threads : 1;
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       if (todo[index]) {
-               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               return;
+       for(grp = 0; grp< MAXQ; grp++)
+               if ( occupied[grp] == FALSE){
+                       occupied[grp] = TRUE;
+                       break;
+               }
+       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+       if (grp > THREADS) {
+               // continue non-parallel
+               return -1;
        }
-       todo[index] = q_create(2048);
-       assert(todo[index]);
-       limit = GDKnr_threads ? GDKnr_threads : 1;
-       assert(limit <= THREADS);
-       for (worker = 0, i = 0; i < limit; i++){
-               for (; worker < THREADS; worker++)
-                       if( workers[worker] == 0)
-                               break;
-               assert(worker < THREADS);
-               if (worker < THREADS) {
-                       assert(workers[worker] == 0);
-                       MT_create_thread(&workers[worker], DFLOWworker, (void 
*) &workers[worker], MT_THR_JOINABLE);
-                       assert(workers[worker] > 0);
-                       workerqueue[worker] = index + 1;
+       if ( todos[grp] )
+               return grp;
+
+       todos[grp] = q_create(2048, "todo");
+       if (todos[grp] == NULL) 
+               return -1;
+
+       // associate a set of workers with the pool
+       for (i = 0; grp>= 0 && i < threads; i++){
+               if (MT_create_thread(&worker, DFLOWworker, (void *) 
&todos[grp], MT_THR_JOINABLE) < 0) {
+                       //Can not create interpreter thread
+                       grp = -1;
+               }
+               if (worker == 0) {
+                       //Failed to create interpreter thread
+                       grp = -1;
                }
        }
-       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+       return grp;
 }
  
 /*
@@ -409,18 +415,28 @@ DFLOWinitialize(int index)
  * For each instruction we keep a list of instructions whose
  * blocking counter should be decremented upon finishing it.
  */
-static void
+static str
 DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
 {
        int pc, i, j, k, l, n, etop = 0;
        int *assign;
        InstrPtr p;
 
+       if (flow == NULL)
+               throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == 
NULL");
+       if (mb == NULL)
+               throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == 
NULL");
        PARDEBUG printf("Initialize dflow block\n");
        assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
+       if (assign == NULL)
+               throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate 
assign");
        etop = flow->stop - flow->start;
        for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
                p = getInstrPtr(mb, pc);
+               if (p == NULL) {
+                       GDKfree(assign);
+                       throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() 
returned NULL");
+               }
 
                /* initial state, ie everything can run */
                flow->status[n].flow = flow;
@@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
 #ifdef USE_MAL_ADMISSION
        memorypool = memoryclaims = 0;
 #endif
+       return MAL_SUCCEED;
 }
 
 /*
@@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow,
 */
 
 static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, Queue *todo)
 {
        int last;
        int i;
 #ifdef USE_MAL_ADMISSION
-       int j;
+       //int j;
        InstrPtr p;
 #endif
        int tasks=0, actions;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
-       int wq;
 
        if (flow == NULL)
                throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == 
NULL");
@@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow)
        /* initialize the eligible statements */
        fe = flow->status;
 
-       if (fe[0].flow->cntxt->flags & timerFlag)
-               fe[0].flow->cntxt->timer = GDKusec();
-
        MT_lock_set(&flow->flowlock, "MALworker");
-       wq = flow->cntxt->idx;
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
                        p = getInstrPtr(flow->mb,fe[i].pc);
-                       for (j = p->retc; j < p->argc; j++)
-                               fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, fe[i].pc, j, FALSE);
+                       if (p == NULL) {
+                               MT_lock_unset(&flow->flowlock, "MALworker");
+                               throw(MAL, "dataflow", "DFLOWscheduler(): 
getInstrPtr(flow->mb,fe[i].pc) returned NULL");
+                       }
+                       //for (j = p->retc; j < p->argc; j++)
+                               //fe[i].argclaim = 
getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE);
 #endif
-                       q_enqueue(todo[wq], flow->status + i);
+                       q_enqueue(todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
@@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
 
        while (actions != tasks ) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to