Changeset: bd832396f821 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bd832396f821 Added Files: monetdb5/scheduler/mut_leftjoin.c monetdb5/scheduler/mut_leftjoin.h monetdb5/scheduler/mut_select.c monetdb5/scheduler/mut_select.h monetdb5/scheduler/mut_util.c monetdb5/scheduler/mut_util.h Removed Files: monetdb5/scheduler/mut_stopRuns.c monetdb5/scheduler/mut_stopRuns.h Modified Files: clients/Tests/exports.stable.out monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/optimizer/opt_multicore.c monetdb5/scheduler/Makefile.ag monetdb5/scheduler/mut_policy.c monetdb5/scheduler/mut_transforms.c monetdb5/scheduler/mut_transforms.h monetdb5/scheduler/run_multicore.c monetdb5/scheduler/run_multicore.h monetdb5/scheduler/run_octopus.c monetdb5/scheduler/srvpool.c sql/test/BugTracker-2013/Tests/All sql/test/BugTracker-2013/Tests/nestedcalls.sql Branch: mutation Log Message:
Code reshuffling and cleanup. diffs (truncated from 3371 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1368,7 +1368,10 @@ str CSTrelease(Client cntxt, MalBlkPtr m str CSTreleaseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); str CSTtake(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); str CSTtoString(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +str CURLdeleteRequest(str *retval, str *url); str CURLgetRequest(str *retval, str *url); +str CURLpostRequest(str *retval, str *url); +str CURLputRequest(str *retval, str *url); str CemptySet(int *k, int *bid); str DICTbind(int *idx, int *val, str *nme); str DICTcompress(int *idx, str *nme, int *bid); diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -83,10 +83,9 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec; -#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ static int volatile exiting = 0; /* @@ -280,12 +279,10 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; - int wq; Thread thr; str error = 0; - - int i; + Queue *todo = *(Queue **) t; + int i,last; thr = THRnew("DFLOWworker"); @@ -294,10 +291,8 @@ DFLOWworker(void *t) GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - assert(workerqueue[id] > 0); - wq = workerqueue[id] - 1; if (fnxt == 0) - fe = q_dequeue(todo[wq]); + fe = q_dequeue(todo); else fe = fnxt; if (exiting) { @@ -319,16 +314,15 @@ DFLOWworker(void *t) #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo[wq], fe); + q_requeue(todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); + fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -381,69 +375,64 @@ DFLOWworker(void *t) MALresourceFairness(GDKusec()- flow->mb->starttime); q_enqueue(flow->done, fe); if ( fnxt == 0) { - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) profilerHeartbeatEvent("wait"); } } GDKfree(GDKerrbuf); GDKsetbuf(0); - workerqueue[wq] = 0; - workers[wq] = 0; THRdel(thr); } /* - * Create a set of DFLOW interpreters. + * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and * typically is equal to the number of cores. * A recursive MAL function call would make for one worker less, * which limits the number of cores for parallel processing. * The workers are assembled in a local table to enable debugging. + * + * BEWARE, failure to create a new worker thread is not an error + * but would lead to serial execution. */ -static str -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) { - int i, worker, limit; + int i, threads, grp; + MT_Id worker; - assert(index >= 0); - assert(index < THREADS); + threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo[index]) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return MAL_SUCCEED; + for(grp = 0; grp< MAXQ; grp++) + if ( occupied[grp] == FALSE){ + occupied[grp] = TRUE; + break; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + if (grp > THREADS) { + // continue non-parallel + return -1; } - todo[index] = q_create(2048, "todo"); - if (todo[index] == NULL) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - throw(MAL, "dataflow", "DFLOWinitialize(): Failed to create todo queue"); + if ( todos[grp] ) + return grp; + + todos[grp] = q_create(2048, "todo"); + if (todos[grp] == NULL) + return -1; + + // associate a set of workers with the pool + for (i = 0; grp>= 0 && i < threads; i++){ + if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp], MT_THR_JOINABLE) < 0) { + //Can not create interpreter thread + grp = -1; + } + if (worker == 0) { + //Failed to create interpreter thread + grp = -1; + } } - limit = GDKnr_threads ? GDKnr_threads : 1; - if (limit > THREADS) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - throw(MAL, "dataflow", "DFLOWinitialize(): using more threads than thread slots: %d > %d", limit, THREADS); - } - for (worker = 0, i = 0; i < limit; i++){ - for (; worker < THREADS; worker++) - if( workers[worker] == 0) - break; - if (worker >= THREADS || workers[worker] > 0) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - throw(MAL, "dataflow", "No free worker slot found"); - } - if (MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE) < 0) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - throw(MAL, "dataflow", "Can not create interpreter thread"); - } - if (workers[worker] == 0) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - throw(MAL, "dataflow", "Failed to create interpreter thread"); - } - workerqueue[worker] = index + 1; - } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return MAL_SUCCEED; + return grp; } /* @@ -582,7 +571,7 @@ static void showFlowEvent(DataFlow flow, */ static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; @@ -593,7 +582,6 @@ DFLOWscheduler(DataFlow flow) int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; - int wq; if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL"); @@ -604,7 +592,6 @@ DFLOWscheduler(DataFlow flow) fe = flow->status; MT_lock_set(&flow->flowlock, "MALworker"); - wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION @@ -616,7 +603,7 @@ DFLOWscheduler(DataFlow flow) for (j = p->retc; j < p->argc; j++) fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE); #endif - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -645,7 +632,7 @@ DFLOWscheduler(DataFlow flow) if (flow->status[i].blocks == 1 ) { flow->status[i].state = DFLOWrunning; flow->status[i].blocks--; - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } else { @@ -664,12 +651,12 @@ DFLOWscheduler(DataFlow flow) } str -runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, MalStkPtr stk) +runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk) { DataFlow flow = NULL; - str ret = MAL_SUCCEED; - int size; - int stoppc = getInstrPtr(mb,startpc)->jump; + str msg = MAL_SUCCEED; + int size, pool; + int *ret; #ifdef DEBUG_FLOW mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc, stoppc); @@ -679,19 +666,24 @@ runMALdataflow(Client cntxt, MalBlkPtr m /* in debugging mode we should not start multiple threads */ if (stk == NULL) throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL"); - if (stk->cmd) + ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0); + if (stk->cmd){ + *ret = TRUE; return MAL_SUCCEED; + } /* too many threads turns dataflow processing off */ - if ( cntxt->idx > MAXQ) + if ( cntxt->idx > MAXQ){ + *ret = TRUE; return MAL_SUCCEED; + } - assert(stoppc > startpc || stoppc == 0); + assert(stoppc > startpc); - /* check existence of workers */ - if (todo[cntxt->idx] == 0) - ret = DFLOWinitialize(cntxt->idx); - if ( ret != MAL_SUCCEED) - return ret; + /* check existence of free worker group, resort to sequential upon failure */ + if( (pool= DFLOWinitialize()) < 0){ + *ret = TRUE; + return MAL_SUCCEED; + } flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); if (flow == NULL) @@ -711,6 +703,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m if (flow->done == NULL) { MT_lock_destroy(&flow->flowlock); GDKfree(flow); + occupied[pool]= FALSE; throw(MAL, "dataflow", "runMALdataflow(): Failed to create flow->done queue"); } @@ -719,6 +712,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m q_destroy(flow->done); MT_lock_destroy(&flow->flowlock); GDKfree(flow); + occupied[pool]= FALSE; throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate flow->status"); } size = DFLOWgraphSize(mb, startpc, stoppc); @@ -729,6 +723,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m q_destroy(flow->done); MT_lock_destroy(&flow->flowlock); GDKfree(flow); + occupied[pool]= FALSE; throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate flow->nodes"); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list