Changeset: bd832396f821 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bd832396f821
Added Files:
        monetdb5/scheduler/mut_leftjoin.c
        monetdb5/scheduler/mut_leftjoin.h
        monetdb5/scheduler/mut_select.c
        monetdb5/scheduler/mut_select.h
        monetdb5/scheduler/mut_util.c
        monetdb5/scheduler/mut_util.h
Removed Files:
        monetdb5/scheduler/mut_stopRuns.c
        monetdb5/scheduler/mut_stopRuns.h
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/modules/mal/language.c
        monetdb5/optimizer/opt_multicore.c
        monetdb5/scheduler/Makefile.ag
        monetdb5/scheduler/mut_policy.c
        monetdb5/scheduler/mut_transforms.c
        monetdb5/scheduler/mut_transforms.h
        monetdb5/scheduler/run_multicore.c
        monetdb5/scheduler/run_multicore.h
        monetdb5/scheduler/run_octopus.c
        monetdb5/scheduler/srvpool.c
        sql/test/BugTracker-2013/Tests/All
        sql/test/BugTracker-2013/Tests/nestedcalls.sql
Branch: mutation
Log Message:

Code reshuffling and cleanup.


diffs (truncated from 3371 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1368,7 +1368,10 @@ str CSTrelease(Client cntxt, MalBlkPtr m
 str CSTreleaseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str CSTtake(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str CSTtoString(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+str CURLdeleteRequest(str *retval, str *url);
 str CURLgetRequest(str *retval, str *url);
+str CURLpostRequest(str *retval, str *url);
+str CURLputRequest(str *retval, str *url);
 str CemptySet(int *k, int *bid);
 str DICTbind(int *idx, int *val, str *nme);
 str DICTcompress(int *idx, str *nme, int *bid);
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -83,10 +83,9 @@ typedef struct DATAFLOW {
        Queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
-#define MAXQ 1024
-static MT_Id workers[THREADS] = {0};
-static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues 
*/
-static Queue *todo[MAXQ] = {0};        /* pending instructions organized by 
user MAXTODO > #users */
+#define MAXQ 256
+static Queue *todos[MAXQ] = {0};       /* pending instructions organized by 
dataflow block */
+static bit occupied[MAXQ]={0};                 /* worker pool is in use? */
 static int volatile exiting = 0;
 
 /*
@@ -280,12 +279,10 @@ DFLOWworker(void *t)
 {
        DataFlow flow;
        FlowEvent fe = 0, fnxt = 0;
-       int id = (int) ((MT_Id *) t - workers), last = 0;
-       int wq;
        Thread thr;
        str error = 0;
-
-       int i;
+       Queue *todo = *(Queue **) t;
+       int i,last;
 
        thr = THRnew("DFLOWworker");
 
@@ -294,10 +291,8 @@ DFLOWworker(void *t)
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
-               assert(workerqueue[id] > 0);
-               wq = workerqueue[id] - 1;
                if (fnxt == 0)
-                       fe = q_dequeue(todo[wq]);
+                       fe = q_dequeue(todo);
                else
                        fe = fnxt;
                if (exiting) {
@@ -319,16 +314,15 @@ DFLOWworker(void *t)
 #ifdef USE_MAL_ADMISSION
                        if (MALadmission(fe->argclaim, fe->hotclaim)) {
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               assert(todo[wq]);
-                               if (todo[wq]->last == 0)
+                               if (todo->last == 0)
                                        MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo[wq], fe);
+                               q_requeue(todo, fe);
                                continue;
                        }
 #endif
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
                        PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
-                                                                 fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
+                                                                 fe->pc, 
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
                        /* release the memory claim */
                        MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -381,69 +375,64 @@ DFLOWworker(void *t)
                                MALresourceFairness(GDKusec()- 
flow->mb->starttime);
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
-                       assert(todo[wq]);
-                       if (todo[wq]->last == 0)
+                       if (todo->last == 0)
                                profilerHeartbeatEvent("wait");
                }
        }
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
-       workerqueue[wq] = 0;
-       workers[wq] = 0;
        THRdel(thr);
 }
 
 /* 
- * Create a set of DFLOW interpreters.
+ * Create an interpreter pool.
  * One worker will adaptively be available for each client.
  * The remainder are taken from the GDKnr_threads argument and
  * typically is equal to the number of cores.
  * A recursive MAL function call would make for one worker less,
  * which limits the number of cores for parallel processing.
  * The workers are assembled in a local table to enable debugging.
+ *
+ * BEWARE, failure to create a new worker thread is not an error
+ * but would lead to serial execution.
  */
-static str
-DFLOWinitialize(int index)
+static int
+DFLOWinitialize(void)
 {
-       int i, worker, limit;
+       int i, threads, grp;
+       MT_Id worker;
 
-       assert(index >= 0);
-       assert(index < THREADS);
+       threads = GDKnr_threads ? GDKnr_threads : 1;
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       if (todo[index]) {
-               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               return MAL_SUCCEED;
+       for(grp = 0; grp< MAXQ; grp++)
+               if ( occupied[grp] == FALSE){
+                       occupied[grp] = TRUE;
+                       break;
+               }
+       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+       if (grp > THREADS) {
+               // continue non-parallel
+               return -1;
        }
-       todo[index] = q_create(2048, "todo");
-       if (todo[index] == NULL) {
-               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               throw(MAL, "dataflow", "DFLOWinitialize(): Failed to create 
todo queue");
+       if ( todos[grp] )
+               return grp;
+
+       todos[grp] = q_create(2048, "todo");
+       if (todos[grp] == NULL) 
+               return -1;
+
+       // associate a set of workers with the pool
+       for (i = 0; grp>= 0 && i < threads; i++){
+               if (MT_create_thread(&worker, DFLOWworker, (void *) 
&todos[grp], MT_THR_JOINABLE) < 0) {
+                       //Can not create interpreter thread
+                       grp = -1;
+               }
+               if (worker == 0) {
+                       //Failed to create interpreter thread
+                       grp = -1;
+               }
        }
-       limit = GDKnr_threads ? GDKnr_threads : 1;
-       if (limit > THREADS) {
-               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-               throw(MAL, "dataflow", "DFLOWinitialize(): using more threads 
than thread slots: %d > %d", limit, THREADS);
-       }
-       for (worker = 0, i = 0; i < limit; i++){
-               for (; worker < THREADS; worker++)
-                       if( workers[worker] == 0)
-                               break;
-               if (worker >= THREADS || workers[worker] > 0) {
-                       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-                       throw(MAL, "dataflow", "No free worker slot found");
-               }
-               if (MT_create_thread(&workers[worker], DFLOWworker, (void *) 
&workers[worker], MT_THR_JOINABLE) < 0) {
-                       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-                       throw(MAL, "dataflow", "Can not create interpreter 
thread");
-               }
-               if (workers[worker] == 0) {
-                       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-                       throw(MAL, "dataflow", "Failed to create interpreter 
thread");
-               }
-               workerqueue[worker] = index + 1;
-       }
-       MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-       return MAL_SUCCEED;
+       return grp;
 }
  
 /*
@@ -582,7 +571,7 @@ static void showFlowEvent(DataFlow flow,
 */
 
 static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, Queue *todo)
 {
        int last;
        int i;
@@ -593,7 +582,6 @@ DFLOWscheduler(DataFlow flow)
        int tasks=0, actions;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
-       int wq;
 
        if (flow == NULL)
                throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == 
NULL");
@@ -604,7 +592,6 @@ DFLOWscheduler(DataFlow flow)
        fe = flow->status;
 
        MT_lock_set(&flow->flowlock, "MALworker");
-       wq = flow->cntxt->idx;
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
@@ -616,7 +603,7 @@ DFLOWscheduler(DataFlow flow)
                        for (j = p->retc; j < p->argc; j++)
                                fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, p, j, FALSE);
 #endif
-                       q_enqueue(todo[wq], flow->status + i);
+                       q_enqueue(todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
@@ -645,7 +632,7 @@ DFLOWscheduler(DataFlow flow)
                                if (flow->status[i].blocks == 1 ) {
                                        flow->status[i].state = DFLOWrunning;
                                        flow->status[i].blocks--;
-                                       q_enqueue(todo[wq], flow->status + i);
+                                       q_enqueue(todo, flow->status + i);
                                        PARDEBUG
                                        mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                                } else {
@@ -664,12 +651,12 @@ DFLOWscheduler(DataFlow flow)
 }
 
 str
-runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, MalStkPtr stk)
+runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr 
stk)
 {
        DataFlow flow = NULL;
-       str ret = MAL_SUCCEED;
-       int size;
-       int stoppc = getInstrPtr(mb,startpc)->jump;
+       str msg = MAL_SUCCEED;
+       int size, pool;
+       int *ret;
 
 #ifdef DEBUG_FLOW
        mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc, 
stoppc);
@@ -679,19 +666,24 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        /* in debugging mode we should not start multiple threads */
        if (stk == NULL)
                throw(MAL, "dataflow", "runMALdataflow(): Called with stk == 
NULL");
-       if (stk->cmd)
+       ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0);
+       if (stk->cmd){
+               *ret = TRUE;
                return MAL_SUCCEED;
+       }
        /* too many threads turns dataflow processing off */
-       if ( cntxt->idx > MAXQ)
+       if ( cntxt->idx > MAXQ){
+               *ret = TRUE;
                return MAL_SUCCEED;
+       }
 
-       assert(stoppc > startpc || stoppc == 0);
+       assert(stoppc > startpc);
 
-       /* check existence of workers */
-       if (todo[cntxt->idx] == 0)
-               ret = DFLOWinitialize(cntxt->idx);
-       if ( ret != MAL_SUCCEED)
-               return ret;
+       /* check existence of free worker group, resort to sequential upon 
failure */
+       if( (pool= DFLOWinitialize()) < 0){
+               *ret = TRUE;
+               return MAL_SUCCEED;
+       }
 
        flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
        if (flow == NULL)
@@ -711,6 +703,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (flow->done == NULL) {
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
+               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to create 
flow->done queue");
        }
 
@@ -719,6 +712,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                q_destroy(flow->done);
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
+               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate 
flow->status");
        }
        size = DFLOWgraphSize(mb, startpc, stoppc);
@@ -729,6 +723,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                q_destroy(flow->done);
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
+               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate 
flow->nodes");
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to