Changeset: 717c2f042c77 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=717c2f042c77 Modified Files: sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/petrinet.c Branch: iot Log Message:
Intermittent commit Still cleaning up. diffs (122 lines): diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -10,7 +10,7 @@ begin end; call iot.query('iot','cq00'); -call iot.query('insert into result select min(t), count(*), avg(val) from stream_tmp;'); +call iot.query('insert into iot.result select min(t), count(*), avg(val) from iot.stream_tmp;'); select * from iot.baskets(); select * from iot.queries(); diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c --- a/sql/backends/monet5/iot/iot.c +++ b/sql/backends/monet5/iot/iot.c @@ -91,28 +91,28 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal nme = *getArgReference_str(stk, pci, 2); /* check existing of the pre-compiled function */ _DEBUG_IOT_ fprintf(stderr,"#iot: locate a SQL procedure %s.%s()\n",sch,nme); + msg = IOTprocedureStmt(cntxt, mb, sch, nme); + if (msg) + return msg; + s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme))); + if (s == NULL) + throw(SQL, "iot.query", "Definition missing"); + qry = s->def; } else if (pci->argc == 2){ // pre-create the new procedure - sch = "iot"; + sch = "user"; snprintf(name, IDLENGTH,"cquery_%d",iotquerycnt++); def = *getArgReference_str(stk, pci, 1); // package it as a procedure in the current schema [todo] - snprintf(buf,BUFSIZ,"create procedure %s() begin %s; end",name,def); + snprintf(buf,BUFSIZ,"create procedure %s.%s() begin %s; end",sch,name,def); _DEBUG_IOT_ fprintf(stderr,"#iot.compile: %s\n",buf); nme = name; msg = SQLstatementIntern(cntxt, &def, nme, 1, 0, 0); if (msg) return msg; + qry = cntxt->curprg->def; } - msg = IOTprocedureStmt(cntxt, mb, sch, nme); - if (msg) - return msg; - s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme))); - if (s == NULL) - throw(SQL, "iot.query", "Definition missing"); - qry = s->def; - _DEBUG_IOT_ fprintf(stderr,"#iot: bake a new continuous query plan\n"); scope = findModule(cntxt->nspace, putName(sch, strlen(sch))); s = newFunction(putName(sch, strlen(sch)), putName(nme, strlen(nme)), FUNCTIONsymbol); diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -82,7 +82,7 @@ int pnettop = 0; int enabled[MAXPN]; /*array that contains the id's of all queries that are enable to fire*/ static int status = BSKTINIT; -static int cycleDelay = 1; /* be careful, it affects response/throughput timings */ +static int cycleDelay = 1000; /* be careful, it affects response/throughput timings */ str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -129,6 +129,7 @@ PNlocate(str modname, str fcnname) break; return i; } + /* A transition is only allowed when all inputs are privately used */ str PNregisterInternal(Client cntxt, MalBlkPtr mb) @@ -148,6 +149,8 @@ PNregisterInternal(Client cntxt, MalBlkP pnet[pnettop].modname = GDKstrdup(getModuleId(sig)); pnet[pnettop].fcnname = GDKstrdup(getFunctionId(sig)); + pnet[pnettop].mb = mb; + pnet[pnettop].stk = prepareMALstack(mb, mb->vsize); pnet[pnettop].status = BSKTPAUSE; pnet[pnettop].cycles = 0; @@ -283,7 +286,7 @@ PNexecute( void *n) { PNnode *node= (PNnode *) n; _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed %s.%s\n",node->modname, node->fcnname); - runMAL(mal_clients, node->mb, 0,0); + runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0); node->status = BSKTPAUSE; } @@ -306,10 +309,13 @@ PNcontroller(void *dummy) status = BSKTRUNNING; while( status != BSKTSTOP && pnettop > 0){ + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller step\n"); if (cycleDelay) MT_sleep_ms(cycleDelay); /* delay to make it more tractable */ - while (status == BSKTPAUSE) /* scheduler is paused */ + while (status == BSKTPAUSE) { /* scheduler is paused */ MT_sleep_ms(cycleDelay); + _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller paused\n"); + } /* collect latest statistics, note that we don't need a lock here, because the count need not be accurate to the usec. It will simply @@ -393,10 +399,12 @@ PNcontroller(void *dummy) } } } + /* after one sweep all threads should be released */ + for (m = 0; m < k; m++) { + MT_join_thread(pnet[i].tid); + } } - MT_lock_set(&iotLock); status = BSKTINIT; - MT_lock_unset(&iotLock); _DEBUG_PETRINET_ mnstr_flush(PNout); (void) dummy; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list