Changeset: 3e23519c40ce for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/3e23519c40ce
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Create a pool of threads that the dataflow can use.


diffs (truncated from 492 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,16 +80,17 @@ typedef struct DATAFLOW {
 
 static struct worker {
        MT_Id id;
-       enum {IDLE, RUNNING, JOINING, EXITED} flag;
+       enum {IDLE, WAITING, RUNNING, FREE } flag;
        ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
        char *errbuf;              /* GDKerrbuf so that we can allocate before 
fork */
        MT_Sema s;
+       int self;
        int next;
 } workers[THREADS];
 /* heads of two mutually exclusive linked lists, both using the .next
  * field in the worker struct */
-static int exited_workers = -1;        /* to be joined threads */
 static int idle_workers = -1;  /* idle workers (no thread associated) */
+static int free_workers = -1;  /* free workers (thread doing nothing) */
 
 static Queue *todo = 0;        /* pending instructions */
 
@@ -102,7 +103,6 @@ mal_dataflow_reset(void)
 {
        stopMALdataflow();
        memset((char*) workers, 0,  sizeof(workers));
-       exited_workers = -1;
        idle_workers = -1;
        if( todo) {
                GDKfree(todo->data);
@@ -292,161 +292,176 @@ static void
 DFLOWworker(void *T)
 {
        struct worker *t = (struct worker *) T;
-       DataFlow flow;
-       FlowEvent fe = 0, fnxt = 0;
-       str error = 0;
-       int i;
-       lng claim;
-       Client cntxt;
-       InstrPtr p;
-
 #ifdef _MSC_VER
        srand((unsigned int) GDKusec());
 #endif
        assert(t->errbuf != NULL);
        GDKsetbuf(t->errbuf);           /* where to leave errors */
        t->errbuf = NULL;
-       GDKclrerr();
+
+       for (;;) {
+               DataFlow flow;
+               FlowEvent fe = 0, fnxt = 0;
+               str error = 0;
+               int i;
+               lng claim;
+               Client cntxt;
+               InstrPtr p;
+
+               GDKclrerr();
 
-       cntxt = ATOMIC_PTR_GET(&t->cntxt);
-       if (cntxt) {
-               /* wait until we are allowed to start working */
-               MT_sema_down(&t->s);
-       }
-       while (1) {
-               if (fnxt == 0) {
-                       MT_thread_setworking(NULL);
-                       cntxt = ATOMIC_PTR_GET(&t->cntxt);
-                       fe = q_dequeue(todo, cntxt);
-                       if (fe == NULL) {
-                               if (cntxt) {
-                                       /* we're not done yet with work for the 
current
-                                        * client (as far as we know), so give 
up the CPU
-                                        * and let the scheduler enter some 
more work, but
-                                        * first compensate for the down we did 
in
-                                        * dequeue */
-                                       MT_sema_up(&todo->s);
-                                       MT_sleep_ms(1);
-                                       continue;
+               if (t->flag == WAITING) {
+                       /* wait until we are allowed to start working */
+                       MT_sema_down(&t->s);
+                       t->flag = RUNNING;
+               }
+               assert(t->flag == RUNNING);
+               cntxt = ATOMIC_PTR_GET(&t->cntxt);
+               while (1) {
+                       if (fnxt == 0) {
+                               MT_thread_setworking(NULL);
+                               cntxt = ATOMIC_PTR_GET(&t->cntxt);
+                               fe = q_dequeue(todo, cntxt);
+                               if (fe == NULL) {
+                                       if (cntxt) {
+                                               /* we're not done yet with work 
for the current
+                                                * client (as far as we know), 
so give up the CPU
+                                                * and let the scheduler enter 
some more work, but
+                                                * first compensate for the 
down we did in
+                                                * dequeue */
+                                               MT_sema_up(&todo->s);
+                                               MT_sleep_ms(1);
+                                               continue;
+                                       }
+                                       /* no more work to be done: exit */
+                                       break;
                                }
-                               /* no more work to be done: exit */
+                               if (fe->flow->cntxt && 
fe->flow->cntxt->mythread)
+                                       
MT_thread_setworking(fe->flow->cntxt->mythread->name);
+                       } else
+                               fe = fnxt;
+                       if (ATOMIC_GET(&exiting)) {
                                break;
                        }
-                       if (fe->flow->cntxt && fe->flow->cntxt->mythread)
-                               
MT_thread_setworking(fe->flow->cntxt->mythread->name);
-               } else
-                       fe = fnxt;
-               if (ATOMIC_GET(&exiting)) {
-                       break;
-               }
-               fnxt = 0;
-               assert(fe);
-               flow = fe->flow;
-               assert(flow);
+                       fnxt = 0;
+                       assert(fe);
+                       flow = fe->flow;
+                       assert(flow);
 
-               /* whenever we have a (concurrent) error, skip it */
-               if (ATOMIC_PTR_GET(&flow->error)) {
-                       q_enqueue(flow->done, fe);
-                       continue;
-               }
-
-               p= getInstrPtr(flow->mb,fe->pc);
-               claim = fe->argclaim;
-               if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, 
claim)) {
-                       // never block on deblockdataflow()
-                       if( p->fcn != (MALfcn) deblockdataflow){
-                               fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               fe->maxclaim = 0;
-                               if (todo->last == 0)
-                                       MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo, fe);
+                       /* whenever we have a (concurrent) error, skip it */
+                       if (ATOMIC_PTR_GET(&flow->error)) {
+                               q_enqueue(flow->done, fe);
                                continue;
                        }
-               }
-               error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
-               /* release the memory claim */
-               MALadmission_release(flow->cntxt, flow->mb, flow->stk, p,  
claim);
 
-               MT_lock_set(&flow->flowlock);
-               fe->state = DFLOWwrapup;
-               MT_lock_unset(&flow->flowlock);
-               if (error) {
-                       void *null = NULL;
-                       /* only collect one error (from one thread, needed for 
stable testing) */
-                       if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
-                               freeException(error);
-                       /* after an error we skip the rest of the block */
-                       q_enqueue(flow->done, fe);
-                       continue;
-               }
+                       p= getInstrPtr(flow->mb,fe->pc);
+                       claim = fe->argclaim;
+                       if (MALadmission_claim(flow->cntxt, flow->mb, 
flow->stk, p, claim)) {
+                               // never block on deblockdataflow()
+                               if( p->fcn != (MALfcn) deblockdataflow){
+                                       fe->hotclaim = 0;   /* don't assume 
priority anymore */
+                                       fe->maxclaim = 0;
+                                       if (todo->last == 0)
+                                               MT_sleep_ms(DELAYUNIT);
+                                       q_requeue(todo, fe);
+                                       continue;
+                               }
+                       }
+                       error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
+                       /* release the memory claim */
+                       MALadmission_release(flow->cntxt, flow->mb, flow->stk, 
p,  claim);
 
-               /* see if you can find an eligible instruction that uses the
-                * result just produced. Then we can continue with it right 
away.
-                * We are just looking forward for the last block, which means 
we
-                * are safe from concurrent actions. No other thread can steal 
it,
-                * because we hold the logical lock.
-                * All eligible instructions are queued
-                */
-       {
-               InstrPtr p = getInstrPtr(flow->mb, fe->pc);
-               assert(p);
-               fe->hotclaim = 0;
-               fe->maxclaim = 0;
+                       MT_lock_set(&flow->flowlock);
+                       fe->state = DFLOWwrapup;
+                       MT_lock_unset(&flow->flowlock);
+                       if (error) {
+                               void *null = NULL;
+                               /* only collect one error (from one thread, 
needed for stable testing) */
+                               if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
+                                       freeException(error);
+                               /* after an error we skip the rest of the block 
*/
+                               q_enqueue(flow->done, fe);
+                               continue;
+                       }
 
-               for (i = 0; i < p->retc; i++){
-                       lng footprint;
-                       footprint = getMemoryClaim(flow->mb, flow->stk, p, i, 
FALSE);
-                       fe->hotclaim += footprint;
-                       if( footprint > fe->maxclaim) fe->maxclaim = footprint;
-               }
-       }
+                       /* see if you can find an eligible instruction that 
uses the
+                        * result just produced. Then we can continue with it 
right away.
+                        * We are just looking forward for the last block, 
which means we
+                        * are safe from concurrent actions. No other thread 
can steal it,
+                        * because we hold the logical lock.
+                        * All eligible instructions are queued
+                        */
+                       p = getInstrPtr(flow->mb, fe->pc);
+                       assert(p);
+                       fe->hotclaim = 0;
+                       fe->maxclaim = 0;
+
+                       for (i = 0; i < p->retc; i++){
+                               lng footprint;
+                               footprint = getMemoryClaim(flow->mb, flow->stk, 
p, i, FALSE);
+                               fe->hotclaim += footprint;
+                               if( footprint > fe->maxclaim)
+                                       fe->maxclaim = footprint;
+                       }
+
 /* Try to get rid of the hot potato or locate an alternative to proceed.
  */
 #define HOTPOTATO
 #ifdef HOTPOTATO
-       /* HOT potato choice */
-       int last = 0, nxt = -1;
-       lng nxtclaim = -1;
+                       /* HOT potato choice */
+                       int last = 0, nxt = -1;
+                       lng nxtclaim = -1;
 
-       MT_lock_set(&flow->flowlock);
-       for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) 
> 0; last = flow->edges[last]){
-               if (flow->status[i].state == DFLOWpending && 
flow->status[i].blocks == 1) {
-                       /* find the one with the largest footprint */
-                       if( nxt == -1){
-                               nxt = i;
-                               nxtclaim = flow->status[i].argclaim;
+                       MT_lock_set(&flow->flowlock);
+                       for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last]){
+                               if (flow->status[i].state == DFLOWpending && 
flow->status[i].blocks == 1) {
+                                       /* find the one with the largest 
footprint */
+                                       if( nxt == -1){
+                                               nxt = i;
+                                               nxtclaim = 
flow->status[i].argclaim;
+                                       }
+                                       if( flow->status[i].argclaim > 
nxtclaim){
+                                               nxt = i;
+                                               nxtclaim =  
flow->status[i].argclaim;
+                                       }
+                               }
                        }
-                       if( flow->status[i].argclaim > nxtclaim){
-                               nxt = i;
-                               nxtclaim =  flow->status[i].argclaim;
+                       /* hot potato can not be removed, use alternative to 
proceed */
+                       if( nxt >= 0){
+                               flow->status[nxt].state = DFLOWrunning;
+                               flow->status[nxt].blocks = 0;
+                               flow->status[nxt].hotclaim = fe->hotclaim;
+                               flow->status[nxt].argclaim += fe->hotclaim;
+                               if( flow->status[nxt].maxclaim < fe->maxclaim)
+                                       flow->status[nxt].maxclaim = 
fe->maxclaim;
+                               fnxt = flow->status + nxt;
+                       }
+                       MT_lock_unset(&flow->flowlock);
+#endif
+
+                       q_enqueue(flow->done, fe);
+                       if ( fnxt == 0 && malProfileMode) {
+                               profilerHeartbeatEvent("wait");
                        }
                }
+               MT_lock_set(&dataflowLock);
+               if (GDKexiting() || ATOMIC_GET(&exiting)) {
+                       MT_lock_unset(&dataflowLock);
+                       break;
+               }
+               t->flag = FREE;
+               assert(free_workers != t->self);
+               t->next = free_workers;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to