Changeset: 63a670bf0ccd for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=63a670bf0ccd Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Keep the administration on events consistent diffs (77 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -253,6 +253,7 @@ DFLOWworker(void *t) /* whenever we have a (concurrent) error, skip it */ if (flow->error) { + fe->state = DFLOWwrapup; q_enqueue(flow->done, fe); continue; } @@ -277,7 +278,6 @@ DFLOWworker(void *t) MALadmission(-fe->argclaim, -fe->hotclaim); #endif - fe->state = DFLOWwrapup; if (error) { MT_lock_set(&flow->flowlock, "runMALdataflow"); if (flow->error) { @@ -294,6 +294,7 @@ DFLOWworker(void *t) flow->error = error; MT_lock_unset(&flow->flowlock, "runMALdataflow"); /* after an error we skip the rest of the block */ + fe->state = DFLOWwrapup; q_enqueue(flow->done, fe); continue; } @@ -301,10 +302,9 @@ DFLOWworker(void *t) /* see if you can find an eligible instruction that uses the * result just produced. Then we can continue with it right away. - * We are just looking forward for the last block, which means we - * are safe from concurrent actions. No other thread can steal it, - * because we hold the logical lock. - * All eligible instructions are queued + * We are just looking forward and hold the logical locks on the + * target variables just produced. + * It means we are safe from concurrent actions. No other thread can steal it. */ #ifdef USE_MAL_ADMISSION fe->hotclaim = 0; @@ -312,7 +312,6 @@ DFLOWworker(void *t) for (i = 0; i < p->retc; i++) fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE); #endif - MT_lock_set(&flow->flowlock, "MALworker"); for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]) if (flow->status[i].state == DFLOWpending && flow->status[i].blocks == 1) { @@ -320,16 +319,11 @@ DFLOWworker(void *t) flow->status[i].blocks = 0; flow->status[i].hotclaim = fe->hotclaim; flow->status[i].argclaim += fe->hotclaim; - if (fnxt) { - if ( flow->error) - q_enqueue(flow->done, fnxt); - else - q_enqueue(todo, fnxt); - } fnxt = flow->status + i; + break; } - MT_lock_unset(&flow->flowlock, "MALworker"); + fe->state = DFLOWwrapup; q_enqueue(flow->done, fe); MALresourceFairness(flow->cntxt, flow->mb, usec); } @@ -535,7 +529,7 @@ DFLOWscheduler(DataFlow flow) flow->status[i].argclaim += f->hotclaim; if (flow->status[i].blocks == 1 ) { flow->status[i].state = DFLOWrunning; - flow->status[i].blocks--; + flow->status[i].blocks= 0; q_enqueue(todo, flow->status + i); PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list