Changeset: 3e23519c40ce for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/3e23519c40ce Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Create a pool of threads that the dataflow can use. diffs (truncated from 492 to 300 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -80,16 +80,17 @@ typedef struct DATAFLOW { static struct worker { MT_Id id; - enum {IDLE, RUNNING, JOINING, EXITED} flag; + enum {IDLE, WAITING, RUNNING, FREE } flag; ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */ char *errbuf; /* GDKerrbuf so that we can allocate before fork */ MT_Sema s; + int self; int next; } workers[THREADS]; /* heads of two mutually exclusive linked lists, both using the .next * field in the worker struct */ -static int exited_workers = -1; /* to be joined threads */ static int idle_workers = -1; /* idle workers (no thread associated) */ +static int free_workers = -1; /* free workers (thread doing nothing) */ static Queue *todo = 0; /* pending instructions */ @@ -102,7 +103,6 @@ mal_dataflow_reset(void) { stopMALdataflow(); memset((char*) workers, 0, sizeof(workers)); - exited_workers = -1; idle_workers = -1; if( todo) { GDKfree(todo->data); @@ -292,161 +292,176 @@ static void DFLOWworker(void *T) { struct worker *t = (struct worker *) T; - DataFlow flow; - FlowEvent fe = 0, fnxt = 0; - str error = 0; - int i; - lng claim; - Client cntxt; - InstrPtr p; - #ifdef _MSC_VER srand((unsigned int) GDKusec()); #endif assert(t->errbuf != NULL); GDKsetbuf(t->errbuf); /* where to leave errors */ t->errbuf = NULL; - GDKclrerr(); + + for (;;) { + DataFlow flow; + FlowEvent fe = 0, fnxt = 0; + str error = 0; + int i; + lng claim; + Client cntxt; + InstrPtr p; + + GDKclrerr(); - cntxt = ATOMIC_PTR_GET(&t->cntxt); - if (cntxt) { - /* wait until we are allowed to start working */ - MT_sema_down(&t->s); - } - while (1) { - if (fnxt == 0) { - MT_thread_setworking(NULL); - cntxt = ATOMIC_PTR_GET(&t->cntxt); - fe = q_dequeue(todo, cntxt); - if (fe == NULL) { - if (cntxt) { - /* we're not done yet with work for the current - * client (as far as we know), so give up the CPU - * and let the scheduler enter some more work, but - * first compensate for the down we did in - * dequeue */ - MT_sema_up(&todo->s); - MT_sleep_ms(1); - continue; + if (t->flag == WAITING) { + /* wait until we are allowed to start working */ + MT_sema_down(&t->s); + t->flag = RUNNING; + } + assert(t->flag == RUNNING); + cntxt = ATOMIC_PTR_GET(&t->cntxt); + while (1) { + if (fnxt == 0) { + MT_thread_setworking(NULL); + cntxt = ATOMIC_PTR_GET(&t->cntxt); + fe = q_dequeue(todo, cntxt); + if (fe == NULL) { + if (cntxt) { + /* we're not done yet with work for the current + * client (as far as we know), so give up the CPU + * and let the scheduler enter some more work, but + * first compensate for the down we did in + * dequeue */ + MT_sema_up(&todo->s); + MT_sleep_ms(1); + continue; + } + /* no more work to be done: exit */ + break; } - /* no more work to be done: exit */ + if (fe->flow->cntxt && fe->flow->cntxt->mythread) + MT_thread_setworking(fe->flow->cntxt->mythread->name); + } else + fe = fnxt; + if (ATOMIC_GET(&exiting)) { break; } - if (fe->flow->cntxt && fe->flow->cntxt->mythread) - MT_thread_setworking(fe->flow->cntxt->mythread->name); - } else - fe = fnxt; - if (ATOMIC_GET(&exiting)) { - break; - } - fnxt = 0; - assert(fe); - flow = fe->flow; - assert(flow); + fnxt = 0; + assert(fe); + flow = fe->flow; + assert(flow); - /* whenever we have a (concurrent) error, skip it */ - if (ATOMIC_PTR_GET(&flow->error)) { - q_enqueue(flow->done, fe); - continue; - } - - p= getInstrPtr(flow->mb,fe->pc); - claim = fe->argclaim; - if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) { - // never block on deblockdataflow() - if( p->fcn != (MALfcn) deblockdataflow){ - fe->hotclaim = 0; /* don't assume priority anymore */ - fe->maxclaim = 0; - if (todo->last == 0) - MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); + /* whenever we have a (concurrent) error, skip it */ + if (ATOMIC_PTR_GET(&flow->error)) { + q_enqueue(flow->done, fe); continue; } - } - error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); - /* release the memory claim */ - MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim); - MT_lock_set(&flow->flowlock); - fe->state = DFLOWwrapup; - MT_lock_unset(&flow->flowlock); - if (error) { - void *null = NULL; - /* only collect one error (from one thread, needed for stable testing) */ - if (!ATOMIC_PTR_CAS(&flow->error, &null, error)) - freeException(error); - /* after an error we skip the rest of the block */ - q_enqueue(flow->done, fe); - continue; - } + p= getInstrPtr(flow->mb,fe->pc); + claim = fe->argclaim; + if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) { + // never block on deblockdataflow() + if( p->fcn != (MALfcn) deblockdataflow){ + fe->hotclaim = 0; /* don't assume priority anymore */ + fe->maxclaim = 0; + if (todo->last == 0) + MT_sleep_ms(DELAYUNIT); + q_requeue(todo, fe); + continue; + } + } + error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); + /* release the memory claim */ + MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim); - /* see if you can find an eligible instruction that uses the - * result just produced. Then we can continue with it right away. - * We are just looking forward for the last block, which means we - * are safe from concurrent actions. No other thread can steal it, - * because we hold the logical lock. - * All eligible instructions are queued - */ - { - InstrPtr p = getInstrPtr(flow->mb, fe->pc); - assert(p); - fe->hotclaim = 0; - fe->maxclaim = 0; + MT_lock_set(&flow->flowlock); + fe->state = DFLOWwrapup; + MT_lock_unset(&flow->flowlock); + if (error) { + void *null = NULL; + /* only collect one error (from one thread, needed for stable testing) */ + if (!ATOMIC_PTR_CAS(&flow->error, &null, error)) + freeException(error); + /* after an error we skip the rest of the block */ + q_enqueue(flow->done, fe); + continue; + } - for (i = 0; i < p->retc; i++){ - lng footprint; - footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); - fe->hotclaim += footprint; - if( footprint > fe->maxclaim) fe->maxclaim = footprint; - } - } + /* see if you can find an eligible instruction that uses the + * result just produced. Then we can continue with it right away. + * We are just looking forward for the last block, which means we + * are safe from concurrent actions. No other thread can steal it, + * because we hold the logical lock. + * All eligible instructions are queued + */ + p = getInstrPtr(flow->mb, fe->pc); + assert(p); + fe->hotclaim = 0; + fe->maxclaim = 0; + + for (i = 0; i < p->retc; i++){ + lng footprint; + footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); + fe->hotclaim += footprint; + if( footprint > fe->maxclaim) + fe->maxclaim = footprint; + } + /* Try to get rid of the hot potato or locate an alternative to proceed. */ #define HOTPOTATO #ifdef HOTPOTATO - /* HOT potato choice */ - int last = 0, nxt = -1; - lng nxtclaim = -1; + /* HOT potato choice */ + int last = 0, nxt = -1; + lng nxtclaim = -1; - MT_lock_set(&flow->flowlock); - for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]){ - if (flow->status[i].state == DFLOWpending && flow->status[i].blocks == 1) { - /* find the one with the largest footprint */ - if( nxt == -1){ - nxt = i; - nxtclaim = flow->status[i].argclaim; + MT_lock_set(&flow->flowlock); + for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]){ + if (flow->status[i].state == DFLOWpending && flow->status[i].blocks == 1) { + /* find the one with the largest footprint */ + if( nxt == -1){ + nxt = i; + nxtclaim = flow->status[i].argclaim; + } + if( flow->status[i].argclaim > nxtclaim){ + nxt = i; + nxtclaim = flow->status[i].argclaim; + } + } } - if( flow->status[i].argclaim > nxtclaim){ - nxt = i; - nxtclaim = flow->status[i].argclaim; + /* hot potato can not be removed, use alternative to proceed */ + if( nxt >= 0){ + flow->status[nxt].state = DFLOWrunning; + flow->status[nxt].blocks = 0; + flow->status[nxt].hotclaim = fe->hotclaim; + flow->status[nxt].argclaim += fe->hotclaim; + if( flow->status[nxt].maxclaim < fe->maxclaim) + flow->status[nxt].maxclaim = fe->maxclaim; + fnxt = flow->status + nxt; + } + MT_lock_unset(&flow->flowlock); +#endif + + q_enqueue(flow->done, fe); + if ( fnxt == 0 && malProfileMode) { + profilerHeartbeatEvent("wait"); } } + MT_lock_set(&dataflowLock); + if (GDKexiting() || ATOMIC_GET(&exiting)) { + MT_lock_unset(&dataflowLock); + break; + } + t->flag = FREE; + assert(free_workers != t->self); + t->next = free_workers; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list