Changeset: 3669ddd28bf0 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3669ddd28bf0 Modified Files: monetdb5/mal/mal_dataflow.c Branch: Feb2013 Log Message:
New thread group per dataflow block The centralized worker thread group could lead to an unacceptable situation. If a user is heavily processing complex queries, then no other user could even log into the system, for its MAL statements ended up at the end of the shared queues. The problem has been resolved by introducing a thread group per dataflow block. This may lead to a large number of processes, whose resources are managed by the OS. It solves bug 3258 diffs (235 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -78,12 +78,12 @@ typedef struct DATAFLOW { int *nodes; /* dependency graph nodes */ int *edges; /* dependency graph */ MT_Lock flowlock; /* lock to protect the above */ + queue *todo; /* pending instructions */ queue *done; /* instructions handled */ + int threads; /* worker threads active */ + MT_Id workers[THREADS]; } *DataFlow, DataFlowRec; -static MT_Id workers[THREADS]; -static queue *todo = 0; /* pending instructions */ - /* * Calculate the size of the dataflow dependency graph. */ @@ -138,7 +138,6 @@ q_destroy(queue *q) static void q_enqueue_(queue *q, FlowEvent d) { - assert(d); if (q->last == q->size) { q->size <<= 1; q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size); @@ -214,7 +213,6 @@ q_dequeue(queue *q) */ MT_lock_unset(&q->l, "q_dequeue"); - assert(r); return r; } @@ -239,14 +237,15 @@ q_dequeue(queue *q) static void DFLOWworker(void *t) { - DataFlow flow; + DataFlow flow = (DataFlow) t; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; + MT_Id id = MT_getpid(); + int last = 0; Thread thr; str error = 0; int i; - lng usec = 0; + //lng usec = 0; thr = THRnew("DFLOWworker"); @@ -254,8 +253,10 @@ DFLOWworker(void *t) GDKerrbuf[0] = 0; while (1) { if (fnxt == 0) - fe = q_dequeue(todo); + fe = q_dequeue(flow->todo); else fe = fnxt; + if ( fe == 0) + break; fnxt = 0; assert(fe); flow = fe->flow; @@ -266,20 +267,20 @@ DFLOWworker(void *t) continue; } - usec = GDKusec(); + //usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - if (todo->last == 0) + if (flow->todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); + q_requeue(flow->todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); - PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", + PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= "SZFMT" claim= " LLFMT "," LLFMT " %s\n", fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ @@ -330,12 +331,15 @@ DFLOWworker(void *t) q_enqueue(flow->done, fe); if ( fnxt == 0) { - if (todo->last == 0) + if (flow->todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); + //else + //MALresourceFairness(NULL, NULL, usec); } } + for( i = 0; i< flow->threads; i++) + if ( flow->workers[i] == id) + flow->workers[i] = 0; GDKfree(GDKerrbuf); GDKsetbuf(0); THRdel(thr); @@ -349,22 +353,51 @@ DFLOWworker(void *t) * The workers are assembled in a local table to enable debugging. */ static void -DFLOWinitialize(void) +DFLOWinitialize(DataFlow flow, int size) { - int i, limit; + int i; - MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + MT_lock_init(&flow->flowlock, "DFLOWworker"); + flow->todo = q_create(size); + flow->done = q_create(size); + flow->threads = GDKnr_threads ? GDKnr_threads :1; + for (i = 0; i < flow->threads; i++){ + MT_create_thread(&flow->workers[i], DFLOWworker, (void *) flow, MT_THR_JOINABLE); + /* upon failure of starting threads we reduce the count */ + if ( flow->workers[i]== 0){ + flow->threads --; + i--; + } } - todo = q_create(2048); - limit = GDKnr_threads ? GDKnr_threads : 1; - for (i = 0; i < limit; i++) - MT_create_thread(&workers[i], DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE); - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); } +static str +DFLOWfinalize(DataFlow flow) +{ + int i, cnt= flow->threads, runs =0; + + for( i = 0; i< cnt; i++) + q_enqueue(flow->todo, 0); + /* time out when threads are already killed */ + do{ + runs++; + cnt = 0; + MT_sleep_ms(1); + for( i = 0; i < flow->threads; i++) + cnt += flow->workers[i] ==0; + } while( cnt != flow->threads && runs <5000); + + if ( runs == 5000) + throw(MAL,"dataflow","Timeout on thread termination"); + GDKfree(flow->status); + GDKfree(flow->edges); + GDKfree(flow->nodes); + q_destroy(flow->done); + q_destroy(flow->todo); + MT_lock_destroy(&flow->flowlock); + GDKfree(flow); + return MAL_SUCCEED; +} /* * The dataflow administration is based on administration of * how many variables are still missing before it can be executed. @@ -518,7 +551,7 @@ DFLOWscheduler(DataFlow flow) for (j = p->retc; j < p->argc; j++) fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); #endif - q_enqueue(todo, flow->status + i); + q_enqueue(flow->todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -543,7 +576,7 @@ DFLOWscheduler(DataFlow flow) if (flow->status[i].blocks == 1 ) { flow->status[i].state = DFLOWrunning; flow->status[i].blocks--; - q_enqueue(todo, flow->status + i); + q_enqueue(flow->todo, flow->status + i); PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } else { @@ -579,13 +612,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m assert(stoppc > startpc); - /* check existence of workers */ - if (workers[0] == 0) - DFLOWinitialize(); - assert(workers[0]); - assert(todo); - flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); + + DFLOWinitialize(flow, stoppc- startpc +1); + assert(flow->todo); + assert(flow->done); flow->cntxt = cntxt; flow->mb = mb; @@ -596,9 +627,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m flow->start = startpc + 1; flow->stop = stoppc; - MT_lock_init(&flow->flowlock, "DFLOWworker"); - flow->done = q_create(stoppc- startpc+1); - flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * sizeof(FlowEventRec)); size = DFLOWgraphSize(mb, startpc, stoppc); size += stoppc - startpc; @@ -608,11 +636,9 @@ runMALdataflow(Client cntxt, MalBlkPtr m ret = DFLOWscheduler(flow); - GDKfree(flow->status); - GDKfree(flow->edges); - GDKfree(flow->nodes); - q_destroy(flow->done); - MT_lock_destroy(&flow->flowlock); - GDKfree(flow); + if( ret == MAL_SUCCEED) + ret = DFLOWfinalize(flow); + else (void) + DFLOWfinalize(flow); return ret; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list