Changeset: 1bd3bc7e199b for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1bd3bc7e199b Modified Files: sql/backends/monet5/datacell/petrinet.c Branch: default Log Message:
Fix multi-threaded execution. The continuous queries are now properly included in the control function. diffs (160 lines): diff --git a/sql/backends/monet5/datacell/petrinet.c b/sql/backends/monet5/datacell/petrinet.c --- a/sql/backends/monet5/datacell/petrinet.c +++ b/sql/backends/monet5/datacell/petrinet.c @@ -78,12 +78,10 @@ #define PNcontrolInfinit 1 /* infinit loop of PNController */ #define PNcontrolEnd 2 /* when all factories are disable PNController exits */ -/* #define _DEBUG_PETRINET_ */ +#define _DEBUG_PETRINET_ /*static int controlRounds = PNcontrolInfinit;*/ -static MT_Lock petriLock; - typedef struct { char *table; int bskt; /* basket used */ @@ -193,7 +191,7 @@ str PNregister(Client cntxt, MalBlkPtr m pnettop++; msg = PNanalysis(cntxt, s->def); /* start the scheduler if analysis does not show errors */ - if ( msg == MAL_SUCCEED && status == BSKTINIT) + if ( msg == MAL_SUCCEED ) return PNstartThread(ret); return msg; } @@ -286,9 +284,9 @@ str PNstopScheduler(int *ret) { int i = 0, j = pnettop; pnettop = 0; /* don't look at it anymore */ - MT_lock_set(&petriLock, "pncontroller"); + MT_lock_set(&dcLock, "pncontroller"); status = BSKTSTOP; - MT_lock_unset(&petriLock, "pncontroller"); + MT_lock_unset(&dcLock, "pncontroller"); i = 0; do { MT_sleep_ms(cycleDelay + 1); /* delay to make it more tractable */ @@ -309,7 +307,9 @@ str PNresumeScheduler(int *ret) for( i =0; i< pnettop; i++) pnet[i].status = BSKTRUNNING; + MT_lock_set(&dcLock, "pncontroller"); status = BSKTRUNNING; + MT_lock_unset(&dcLock, "pncontroller"); (void) ret; return MAL_SUCCEED; } @@ -320,7 +320,9 @@ str PNpauseScheduler(int *ret) for( i =0; i< pnettop; i++) pnet[i].status = BSKTPAUSE; + MT_lock_set(&dcLock, "pncontroller"); status = BSKTPAUSE; + MT_lock_unset(&dcLock, "pncontroller"); (void) ret; return MAL_SUCCEED; } @@ -467,6 +469,7 @@ PNcontroller(void *dummy) int m = 0; str msg; lng t, analysis, now; + char buf[BUFSIZ], *modnme, *fcnnme; cntxt = mal_clients; /* run as admin */ SQLinitEnvironment(cntxt); @@ -479,17 +482,17 @@ PNcontroller(void *dummy) return; } - /* create the lock */ - MT_lock_init(&petriLock, "petrinet"); - /* create a fake procedure to highlight the continuous queries */ - s = newFunction("user", "pnController", FACTORYsymbol); + s = newFunction(userRef, "pnController", FUNCTIONsymbol); + mb= s->def; p = getSignature(s); - getArg(p, 0) = newTmpVariable(mb = s->def, TYPE_void); + getArg(p, 0) = newTmpVariable(mb, TYPE_void); +reinit: + MT_lock_set(&dcLock, "pncontroller"); + status = BSKTRUNNING; + mb->stop = 1; /* create an execution environment for all transitions */ for (i = 0; i < pnettop; i++) { - char buf[BUFSIZ], *modnme, *fcnnme; - BSKTelements(pnet[i].name, buf, &modnme, &fcnnme); BSKTtolower(modnme); BSKTtolower(fcnnme); @@ -503,6 +506,7 @@ PNcontroller(void *dummy) mnstr_printf(cntxt->fdout, "#Petrinet Controller found errors\n"); return; } + MT_lock_unset(&dcLock, "pncontroller"); newStack(glb, mb->vtop); memset((char *) glb, 0, stackSize(mb->vtop)); glb->stktop = mb->vtop; @@ -510,19 +514,17 @@ PNcontroller(void *dummy) #ifdef _DEBUG_PETRINET_ printFunction(cntxt->fdout, mb, 0, LIST_MAL_ALL); #endif - do { + while( status != BSKTSTOP){ if (cycleDelay) MT_sleep_ms(cycleDelay); /* delay to make it more tractable */ while (status == BSKTPAUSE) ; - MT_lock_set(&petriLock, "pncontroller"); - if (status != BSKTSTOP) - /* collect latest statistics, note that we don't need a lock here, - because the count need not be accurate to the usec. It will simply - come back. We also only have to check the sources that are marked - empty. */ - status = BSKTRUNNING; - MT_lock_unset(&petriLock, "pncontroller"); + if ( mb->stop < pnettop + 2) + goto reinit; /* new query arrived */ + /* collect latest statistics, note that we don't need a lock here, + because the count need not be accurate to the usec. It will simply + come back. We also only have to check the sources that are marked + empty. */ now = GDKusec(); for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++) if ( pnet[i].status != BSKTPAUSE ){ @@ -608,8 +610,10 @@ PNcontroller(void *dummy) } } } - } while (status != BSKTSTOP); + } + MT_lock_set(&dcLock, "pncontroller"); status = BSKTINIT; + MT_lock_unset(&dcLock, "pncontroller"); (void) dummy; } @@ -621,7 +625,7 @@ str PNstartThread(int *ret) PNdump(&s); #endif - if (MT_create_thread(&pid, PNcontroller, &s, MT_THR_DETACHED) != 0) + if (status== BSKTINIT && MT_create_thread(&pid, PNcontroller, &s, MT_THR_DETACHED) != 0) throw(MAL, "petrinet.startThread", "Process creation failed"); (void) ret; @@ -635,10 +639,10 @@ static str PNstart(int *ret) #ifdef _DEBUG_PETRINET_ PNdump(&s); #endif - MT_lock_set(&petriLock, "pncontroller"); + MT_lock_set(&dcLock, "pncontroller"); if (status != BSKTSTOP) status = BSKTRUNNING; - MT_lock_unset(&petriLock, "pncontroller"); + MT_lock_unset(&dcLock, "pncontroller"); /*controlRounds = PNcontrolEnd;*/ PNcontroller(&s); _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list