Changeset: 63a670bf0ccd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=63a670bf0ccd
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Keep the administration on events consistent


diffs (77 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -253,6 +253,7 @@ DFLOWworker(void *t)
 
                /* whenever we have a (concurrent) error, skip it */
                if (flow->error) {
+                       fe->state = DFLOWwrapup;
                        q_enqueue(flow->done, fe);
                        continue;
                }
@@ -277,7 +278,6 @@ DFLOWworker(void *t)
                        MALadmission(-fe->argclaim, -fe->hotclaim);
 #endif
 
-                       fe->state = DFLOWwrapup;
                        if (error) {
                                MT_lock_set(&flow->flowlock, "runMALdataflow");
                                if (flow->error) {
@@ -294,6 +294,7 @@ DFLOWworker(void *t)
                                        flow->error = error;
                                MT_lock_unset(&flow->flowlock, 
"runMALdataflow");
                                /* after an error we skip the rest of the block 
*/
+                               fe->state = DFLOWwrapup;
                                q_enqueue(flow->done, fe);
                                continue;
                        }
@@ -301,10 +302,9 @@ DFLOWworker(void *t)
 
                /* see if you can find an eligible instruction that uses the
                 * result just produced. Then we can continue with it right 
away.
-                * We are just looking forward for the last block, which means 
we
-                * are safe from concurrent actions. No other thread can steal 
it,
-                * because we hold the logical lock.
-                * All eligible instructions are queued
+                * We are just looking forward and hold the logical locks on the
+                * target variables just produced. 
+                * It means we are safe from concurrent actions. No other 
thread can steal it.
                 */
 #ifdef USE_MAL_ADMISSION
                fe->hotclaim = 0;
@@ -312,7 +312,6 @@ DFLOWworker(void *t)
                for (i = 0; i < p->retc; i++)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
 #endif
-               MT_lock_set(&flow->flowlock, "MALworker");
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending &&
                                flow->status[i].blocks == 1) {
@@ -320,16 +319,11 @@ DFLOWworker(void *t)
                                flow->status[i].blocks = 0;
                                flow->status[i].hotclaim = fe->hotclaim;
                                flow->status[i].argclaim += fe->hotclaim;
-                               if (fnxt) {
-                                       if ( flow->error)
-                                               q_enqueue(flow->done, fnxt);
-                                       else
-                                               q_enqueue(todo, fnxt);
-                               }
                                fnxt = flow->status + i;
+                               break;
                        }
-               MT_lock_unset(&flow->flowlock, "MALworker");
 
+               fe->state = DFLOWwrapup;
                q_enqueue(flow->done, fe);
                MALresourceFairness(flow->cntxt, flow->mb, usec);
        }
@@ -535,7 +529,7 @@ DFLOWscheduler(DataFlow flow)
                                flow->status[i].argclaim += f->hotclaim;
                                if (flow->status[i].blocks == 1 ) {
                                        flow->status[i].state = DFLOWrunning;
-                                       flow->status[i].blocks--;
+                                       flow->status[i].blocks= 0;
                                        q_enqueue(todo, flow->status + i);
                                        PARDEBUG
                                        mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to