Changeset: eb1c483489cb for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=eb1c483489cb Modified Files: sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/petrinet.c sql/backends/monet5/iot/petrinet.h Branch: iot Log Message:
Ensure proper properties and use global debugging stream diffs (truncated from 334 to 300 lines): diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -122,7 +122,7 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql baskets[idx].schema_name = GDKstrdup(s->base.name); baskets[idx].table_name = GDKstrdup(t->base.name); - baskets[idx].seen = * timestamp_nil; + (void) MTIMEcurrent_timestamp(&baskets[idx].seen); baskets[idx].status = BSKTWAIT; baskets[idx].count = 0; @@ -537,6 +537,7 @@ BSKTtumbleInternal(Client cntxt, str sch if( BATcount(b) == 0){ baskets[bskt].status = BSKTWAIT; } + BATderiveProps(b, FALSE); } return MAL_SUCCEED; } @@ -667,7 +668,7 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M else BUNappend(bn, value, TRUE); cnt = BATcount(bn); - //BATderiveProps(bn, FALSE); + BATderiveProps(bn, FALSE); } else throw(SQL, "basket.append", "Cannot access target column %s.%s.%s",sname,tname,cname); if(cnt){ diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c --- a/sql/backends/monet5/iot/iot.c +++ b/sql/backends/monet5/iot/iot.c @@ -29,7 +29,6 @@ #include "petrinet.h" MT_Lock iotLock MT_LOCK_INITIALIZER("iotLock"); -#define IOTout mal_clients[1].fdout // locate the SQL procedure in the catalog static str @@ -176,13 +175,13 @@ IOTreceptorThread(void *dummy) if( cntxt == NULL) return; //SQLinitClient(cntxt); - _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n", + _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s started for %s\n", baskets[idx].schema_name, baskets[idx].table_name, baskets[idx].source); /* continously scan the container for baskets */ BSKTimportInternal(cntxt, idx); - _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the file\n", + _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s imported the file\n", baskets[idx].schema_name, baskets[idx].table_name); } diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -78,7 +78,7 @@ typedef struct { lng time; /* total time spent for all invocations */ } PNnode; -PNnode pnet[MAXPN]; +PNnode pnet[MAXPN]={0}; int pnettop = 0; int enabled[MAXPN]; /*array that contains the id's of all queries that are enable to fire*/ @@ -138,7 +138,7 @@ PNregisterInternal(Client cntxt, MalBlkP Symbol s; char buf[IDLENGTH]; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#registerInternal status %d\n", init); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n", init); if (pnettop == MAXPN) GDKerror("petrinet.register:Too many transitions"); @@ -204,13 +204,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma throw(SQL,"iot.pause","Continuous query not found"); } pnet[i].status = newstatus; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s %s\n", modname,fcnname, statusname[newstatus]); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", modname,fcnname, statusname[newstatus]); MT_lock_unset(&iotLock); return MAL_SUCCEED; } for ( i = 0; i < pnettop; i++){ pnet[i].status = newstatus; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s %s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]); } MT_lock_unset(&iotLock); return MAL_SUCCEED; @@ -218,13 +218,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma str PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#resume scheduler \n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler \n"); return PNstatus(cntxt, mb, stk, pci, PNWAIT); } str PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#pause scheduler \n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler \n"); return PNstatus(cntxt, mb, stk, pci, PNPAUSED); } @@ -244,7 +244,7 @@ PNwait(Client cntxt, MalBlkPtr mb, MalSt str PNstop(void){ int i,cnt; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n"); pnstatus = PNSTOP; // avoid starting new continuous queries for(cnt=0, i = 0; i < pnettop; i++) @@ -257,7 +257,7 @@ PNstop(void){ cnt += pnet[i].status != PNWAIT; } while(cnt); BSKTclean(0); - _DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#all queries stopped \n"); return MAL_SUCCEED; } @@ -287,7 +287,7 @@ PNderegister(Client cntxt, MalBlkPtr mb, pnet[i]= pnet[i+1]; memset((void*) (pnet+i), 0, sizeof(PNnode)); pnettop--; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered %s.%s\n", modname,fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered %s.%s\n", modname,fcnname); MT_lock_unset(&iotLock); return MAL_SUCCEED; } @@ -298,14 +298,14 @@ PNderegister(Client cntxt, MalBlkPtr mb, memset((void*) (pnet+i), 0, sizeof(PNnode)); } pnettop = 0; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered all\n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n"); MT_lock_unset(&iotLock); return MAL_SUCCEED; } str PNcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler cycles set \n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler cycles set \n"); (void) cntxt; (void) mb; (void) stk; @@ -318,22 +318,22 @@ PNcycles(Client cntxt, MalBlkPtr mb, Mal str PNdump(void *ret) { int i, k, idx; - mnstr_printf(PNout, "#scheduler status %s\n", statusname[pnstatus]); + mnstr_printf(GDKout, "#scheduler status %s\n", statusname[pnstatus]); for (i = 0; i < pnettop; i++) { - mnstr_printf(PNout, "#[%d]\t%s.%s %s delay %d cycles %d events %d time " LLFMT " ms\n", + mnstr_printf(GDKout, "#[%d]\t%s.%s %s delay %d cycles %d events %d time " LLFMT " ms\n", i, pnet[i].modname, pnet[i].fcnname, statusname[pnet[i].status], pnet[i].delay, pnet[i].cycles, pnet[i].events, pnet[i].time / 1000); if (pnet[i].error) - mnstr_printf(PNout, "#%s\n", pnet[i].error); + mnstr_printf(GDKout, "#%s\n", pnet[i].error); for (k = 0; k < MAXBSKT && pnet[i].inputs[k]; k++){ idx = pnet[i].inputs[k]; - mnstr_printf(PNout, "#<--\t%s basket "BUNFMT" %d\n", + mnstr_printf(GDKout, "#<--\t%s basket "BUNFMT" %d\n", baskets[idx].table_name, baskets[idx].count, baskets[idx].status); } for (k = 0; k <MAXBSKT && pnet[i].outputs[k]; k++){ idx = pnet[i].outputs[k]; - mnstr_printf(PNout, "#-->\t%s basket "BUNFMT" %d\n", + mnstr_printf(GDKout, "#-->\t%s basket "BUNFMT" %d\n", baskets[idx].table_name, baskets[idx].count, baskets[idx].status); @@ -399,19 +399,19 @@ PNexecute( void *n) str msg= MAL_SUCCEED; lng t = GDKusec(); - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s\n",node->modname, node->fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s\n",node->modname, node->fcnname); // first grab exclusive access to all streams. for (j = 0; j < MAXBSKT && node->inputs[j]; j++) MT_lock_set(&baskets[node->inputs[j]].lock); for (j = 0; j < MAXBSKT && node->outputs[j]; j++) MT_lock_set(&baskets[node->outputs[j]].lock); - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all locked\n",node->modname, node->fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all locked\n",node->modname, node->fcnname); msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0); _DEBUG_PETRINET_ - mnstr_printf(PNout, "#petrinet.execute %s.%s transition done:%s\n", + mnstr_printf(GDKout, "#petrinet.execute %s.%s transition done:%s\n", node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:"")); // empty the baskets according to their policy @@ -421,7 +421,7 @@ PNexecute( void *n) MT_lock_unset(&baskets[node->outputs[j]].lock); pnet[node->inputs[0]].time += GDKusec() - t; /* keep around in microseconds */ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all unlocked\n",node->modname, node->fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all unlocked\n",node->modname, node->fcnname); node->status = PNWAIT; } @@ -437,7 +437,7 @@ PNscheduler(void *dummy) char claimed[MAXBSKT]; timestamp ts, tn; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.controller started\n"); cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/ if( cntxt){ if( SQLinitClient(cntxt) != MAL_SUCCEED) @@ -469,7 +469,7 @@ PNscheduler(void *dummy) // check if all input baskets are available and non-empty for (j = 0; j < MAXBSKT && pnet[i].enabled && pnet[i].inputs[j]; j++) { idx = pnet[i].inputs[j]; - if (baskets[idx].status != BSKTFILLED ){ + if (baskets[idx].status != BSKTFILLED && baskets[idx].heartbeat == 0 ){ pnet[i].enabled = 0; break; } @@ -479,6 +479,7 @@ PNscheduler(void *dummy) (void) MTIMEtimestamp_add(&tn, &baskets[idx].seen, &baskets[idx].heartbeat); if (tn.days < ts.days || (tn.days == ts.days && tn.msecs < ts.msecs)) { pnet[i].enabled = 0; + PNcycle--; // it does not count as a valid cycle. break; } } else @@ -493,13 +494,13 @@ PNscheduler(void *dummy) /* a basket can be used in at most one continuous query at a time */ for (j = 0; j < MAXBSKT && pnet[i].enabled && pnet[i].inputs[j]; j++) if( claimed[pnet[i].inputs[j]]){ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n", pnet[i].modname, pnet[i].fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n", pnet[i].modname, pnet[i].fcnname); pnet[i].enabled = 0; break; } for (j = 0; j < MAXBSKT && pnet[i].enabled && pnet[i].outputs[j]; j++) if( claimed[pnet[i].outputs[j]]){ - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n", pnet[i].modname, pnet[i].fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n", pnet[i].modname, pnet[i].fcnname); pnet[i].enabled = 0; break; } @@ -514,7 +515,7 @@ PNscheduler(void *dummy) /*save the ids of all continuous queries that can be executed */ enabled[k++] = i; - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname); } pntasks += pnet[i].enabled; } @@ -525,7 +526,7 @@ PNscheduler(void *dummy) for (m = 0; m < k; m++) { i = enabled[m]; if (pnet[i].enabled ) { - _DEBUG_PETRINET_ mnstr_printf(PNout, "#Run transition %s \n", pnet[i].fcnname); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Run transition %s \n", pnet[i].fcnname); t = GDKusec(); // Fork MAL execution thread @@ -553,22 +554,22 @@ PNscheduler(void *dummy) } } /* after one sweep all threads should be released */ - for (m = 0; m < k; m++) { - _DEBUG_PETRINET_ mnstr_printf(PNout, "#Terminate query thread %s \n", pnet[i].fcnname); - MT_join_thread(pnet[i].tid); + for (m = 0; m < k; m++) + if(pnet[enabled[m]].tid){ + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Terminate query thread %s \n", pnet[enabled[m]].fcnname); + MT_join_thread(pnet[enabled[m]].tid); } _DEBUG_PETRINET_ if (pnstatus == PNRUNNING && cycleDelay) MT_sleep_ms(cycleDelay); /* delay to make it more tractable */ MT_sleep_ms(2); while (pnstatus == PNPAUSED) { /* scheduler is paused */ MT_sleep_ms(cycleDelay); - _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller paused\n"); + _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.controller paused\n"); } } MCcloseClient(cntxt); pnstatus = PNINIT; - _DEBUG_PETRINET_ mnstr_flush(PNout); (void) dummy; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list