Changeset: 717c2f042c77 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=717c2f042c77
Modified Files:
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

Intermittent commit
Still cleaning up.


diffs (122 lines):

diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -10,7 +10,7 @@ begin
 end;
 
 call iot.query('iot','cq00');
-call iot.query('insert into result select min(t), count(*), avg(val) from 
stream_tmp;');
+call iot.query('insert into iot.result select min(t), count(*), avg(val) from 
iot.stream_tmp;');
 
 select * from  iot.baskets();
 select * from  iot.queries();
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -91,28 +91,28 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
                nme = *getArgReference_str(stk, pci, 2);
                /* check existing of the pre-compiled function */
                _DEBUG_IOT_ fprintf(stderr,"#iot: locate a SQL procedure 
%s.%s()\n",sch,nme);
+               msg = IOTprocedureStmt(cntxt, mb, sch, nme);
+               if (msg)
+                       return msg;
+               s = findSymbolInModule(cntxt->nspace, putName(nme, 
strlen(nme)));
+               if (s == NULL)
+                       throw(SQL, "iot.query", "Definition missing");
+               qry = s->def;
        } else if (pci->argc == 2){
                // pre-create the new procedure
-               sch = "iot";
+               sch = "user";
                snprintf(name, IDLENGTH,"cquery_%d",iotquerycnt++);
                def = *getArgReference_str(stk, pci, 1);
                // package it as a procedure in the current schema [todo]
-               snprintf(buf,BUFSIZ,"create procedure %s() begin %s; 
end",name,def);
+               snprintf(buf,BUFSIZ,"create procedure %s.%s() begin %s; 
end",sch,name,def);
                _DEBUG_IOT_ fprintf(stderr,"#iot.compile: %s\n",buf);
                nme = name;
                msg = SQLstatementIntern(cntxt, &def, nme, 1, 0, 0);
                if (msg)
                        return msg;
+               qry = cntxt->curprg->def;
        }
 
-       msg = IOTprocedureStmt(cntxt, mb, sch, nme);
-       if (msg)
-               return msg;
-       s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme)));
-       if (s == NULL)
-               throw(SQL, "iot.query", "Definition missing");
-       qry = s->def;
-
        _DEBUG_IOT_ fprintf(stderr,"#iot: bake a new continuous query plan\n");
        scope = findModule(cntxt->nspace, putName(sch, strlen(sch)));
        s = newFunction(putName(sch, strlen(sch)), putName(nme, strlen(nme)), 
FUNCTIONsymbol);
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -82,7 +82,7 @@ int pnettop = 0;
 int enabled[MAXPN];     /*array that contains the id's of all queries that are 
enable to fire*/
 
 static int status = BSKTINIT;
-static int cycleDelay = 1; /* be careful, it affects response/throughput 
timings */
+static int cycleDelay = 1000; /* be careful, it affects response/throughput 
timings */
 
 str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -129,6 +129,7 @@ PNlocate(str modname, str fcnname)
                        break;
        return i;
 }
+
 /* A transition is only allowed when all inputs are privately used */
 str
 PNregisterInternal(Client cntxt, MalBlkPtr mb)
@@ -148,6 +149,8 @@ PNregisterInternal(Client cntxt, MalBlkP
 
        pnet[pnettop].modname = GDKstrdup(getModuleId(sig));
        pnet[pnettop].fcnname = GDKstrdup(getFunctionId(sig));
+       pnet[pnettop].mb = mb;
+       pnet[pnettop].stk = prepareMALstack(mb, mb->vsize);
 
        pnet[pnettop].status = BSKTPAUSE;
        pnet[pnettop].cycles = 0;
@@ -283,7 +286,7 @@ PNexecute( void *n)
 {
        PNnode *node= (PNnode *) n;
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed 
%s.%s\n",node->modname, node->fcnname);
-       runMAL(mal_clients, node->mb, 0,0);
+       runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
        node->status = BSKTPAUSE;
 }
 
@@ -306,10 +309,13 @@ PNcontroller(void *dummy)
        status = BSKTRUNNING;
 
        while( status != BSKTSTOP && pnettop > 0){
+               _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller 
step\n");
                if (cycleDelay)
                        MT_sleep_ms(cycleDelay);  /* delay to make it more 
tractable */
-               while (status == BSKTPAUSE)     /* scheduler is paused */
+               while (status == BSKTPAUSE)     { /* scheduler is paused */
                        MT_sleep_ms(cycleDelay);  
+                       _DEBUG_PETRINET_ mnstr_printf(PNout, 
"#petrinet.controller paused\n");
+               }
 
                /* collect latest statistics, note that we don't need a lock 
here,
                   because the count need not be accurate to the usec. It will 
simply
@@ -393,10 +399,12 @@ PNcontroller(void *dummy)
                                }
                        }
                }
+               /* after one sweep all threads should be released */
+               for (m = 0; m < k; m++) {
+                       MT_join_thread(pnet[i].tid);
+               }
        }
-       MT_lock_set(&iotLock);
        status = BSKTINIT;
-       MT_lock_unset(&iotLock);
        _DEBUG_PETRINET_ mnstr_flush(PNout);
        (void) dummy;
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to