Changeset: eb1c483489cb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=eb1c483489cb
Modified Files:
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
Branch: iot
Log Message:

Ensure proper properties and use global debugging stream


diffs (truncated from 334 to 300 lines):

diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -122,7 +122,7 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
 
        baskets[idx].schema_name = GDKstrdup(s->base.name);
        baskets[idx].table_name = GDKstrdup(t->base.name);
-       baskets[idx].seen = * timestamp_nil;
+       (void) MTIMEcurrent_timestamp(&baskets[idx].seen);
 
        baskets[idx].status = BSKTWAIT;
        baskets[idx].count = 0;
@@ -537,6 +537,7 @@ BSKTtumbleInternal(Client cntxt, str sch
                if( BATcount(b) == 0){
                        baskets[bskt].status = BSKTWAIT;
                }
+               BATderiveProps(b, FALSE);
        }
        return MAL_SUCCEED;
 }
@@ -667,7 +668,7 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
                else
                        BUNappend(bn, value, TRUE);
                cnt = BATcount(bn);
-               //BATderiveProps(bn, FALSE);
+               BATderiveProps(bn, FALSE);
        } else throw(SQL, "basket.append", "Cannot access target column 
%s.%s.%s",sname,tname,cname);
        
        if(cnt){
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -29,7 +29,6 @@
 #include "petrinet.h"
 
 MT_Lock iotLock MT_LOCK_INITIALIZER("iotLock");
-#define IOTout mal_clients[1].fdout
 
 // locate the SQL procedure in the catalog
 static str
@@ -176,13 +175,13 @@ IOTreceptorThread(void *dummy)
        if( cntxt == NULL)
                return;
        //SQLinitClient(cntxt);
-    _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n",
+    _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s started for %s\n",
                baskets[idx].schema_name, 
                baskets[idx].table_name, 
                baskets[idx].source);
        /* continously scan the container for baskets */
                BSKTimportInternal(cntxt, idx);
-    _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the  
file\n",
+    _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s imported the  
file\n",
                baskets[idx].schema_name, 
                baskets[idx].table_name);
 }
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -78,7 +78,7 @@ typedef struct {
        lng time;       /* total time spent for all invocations */
 } PNnode;
 
-PNnode pnet[MAXPN];
+PNnode pnet[MAXPN]={0};
 int pnettop = 0;
 
 int enabled[MAXPN];     /*array that contains the id's of all queries that are 
enable to fire*/
@@ -138,7 +138,7 @@ PNregisterInternal(Client cntxt, MalBlkP
        Symbol s;
        char buf[IDLENGTH];
 
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#registerInternal status %d\n", 
init);
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n", 
init);
        if (pnettop == MAXPN) 
                GDKerror("petrinet.register:Too many transitions");
 
@@ -204,13 +204,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
                        throw(SQL,"iot.pause","Continuous query not found");
                }
                pnet[i].status = newstatus;
-               _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s 
%s\n", modname,fcnname, statusname[newstatus]);
+               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s 
%s\n", modname,fcnname, statusname[newstatus]);
                MT_lock_unset(&iotLock);
                return MAL_SUCCEED;
        }
        for ( i = 0; i < pnettop; i++){
                pnet[i].status = newstatus;
-               _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s  
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s  
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
        }
        MT_lock_unset(&iotLock);
        return MAL_SUCCEED;
@@ -218,13 +218,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
 
 str
 PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#resume scheduler \n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler \n");
        return PNstatus(cntxt, mb, stk, pci, PNWAIT);
 }
 
 str
 PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#pause scheduler \n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler \n");
        return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
 }
 
@@ -244,7 +244,7 @@ PNwait(Client cntxt, MalBlkPtr mb, MalSt
 str
 PNstop(void){
        int i,cnt;
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n");
 
        pnstatus = PNSTOP; // avoid starting new continuous queries
        for(cnt=0,  i = 0; i < pnettop; i++)
@@ -257,7 +257,7 @@ PNstop(void){
                        cnt += pnet[i].status != PNWAIT;
        } while(cnt);
        BSKTclean(0);
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#all queries stopped \n");
        return MAL_SUCCEED;
 }
 
@@ -287,7 +287,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
                        pnet[i]= pnet[i+1];
                memset((void*) (pnet+i), 0, sizeof(PNnode));
                pnettop--;
-               _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered 
%s.%s\n", modname,fcnname);
+               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered 
%s.%s\n", modname,fcnname);
                MT_lock_unset(&iotLock);
                return MAL_SUCCEED;
        }
@@ -298,14 +298,14 @@ PNderegister(Client cntxt, MalBlkPtr mb,
                memset((void*) (pnet+i), 0, sizeof(PNnode));
        }
        pnettop = 0;
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered all\n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n");
        MT_lock_unset(&iotLock);
        return MAL_SUCCEED;
 }
 
 str
 PNcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler cycles set \n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler cycles set \n");
        (void) cntxt;
        (void) mb;
        (void) stk;
@@ -318,22 +318,22 @@ PNcycles(Client cntxt, MalBlkPtr mb, Mal
 str PNdump(void *ret)
 {
        int i, k, idx;
-       mnstr_printf(PNout, "#scheduler status %s\n", statusname[pnstatus]);
+       mnstr_printf(GDKout, "#scheduler status %s\n", statusname[pnstatus]);
        for (i = 0; i < pnettop; i++) {
-               mnstr_printf(PNout, "#[%d]\t%s.%s %s delay %d cycles %d events 
%d time " LLFMT " ms\n",
+               mnstr_printf(GDKout, "#[%d]\t%s.%s %s delay %d cycles %d events 
%d time " LLFMT " ms\n",
                                i, pnet[i].modname, pnet[i].fcnname, 
statusname[pnet[i].status], pnet[i].delay, pnet[i].cycles, pnet[i].events, 
pnet[i].time / 1000);
                if (pnet[i].error)
-                       mnstr_printf(PNout, "#%s\n", pnet[i].error);
+                       mnstr_printf(GDKout, "#%s\n", pnet[i].error);
                for (k = 0; k < MAXBSKT && pnet[i].inputs[k]; k++){
                        idx = pnet[i].inputs[k];
-                       mnstr_printf(PNout, "#<--\t%s basket "BUNFMT" %d\n",
+                       mnstr_printf(GDKout, "#<--\t%s basket "BUNFMT" %d\n",
                                        baskets[idx].table_name,
                                        baskets[idx].count,
                                        baskets[idx].status);
                }
                for (k = 0; k <MAXBSKT &&  pnet[i].outputs[k]; k++){
                        idx = pnet[i].outputs[k];
-                       mnstr_printf(PNout, "#-->\t%s basket "BUNFMT" %d\n",
+                       mnstr_printf(GDKout, "#-->\t%s basket "BUNFMT" %d\n",
                                        baskets[idx].table_name,
                                        baskets[idx].count,
                                        baskets[idx].status);
@@ -399,19 +399,19 @@ PNexecute( void *n)
        str msg=  MAL_SUCCEED;
        lng t = GDKusec();
 
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute 
%s.%s\n",node->modname, node->fcnname);
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute 
%s.%s\n",node->modname, node->fcnname);
        // first grab exclusive access to all streams.
        for (j = 0; j < MAXBSKT &&  node->inputs[j]; j++) 
                MT_lock_set(&baskets[node->inputs[j]].lock);
        for (j = 0; j < MAXBSKT &&  node->outputs[j]; j++) 
                MT_lock_set(&baskets[node->outputs[j]].lock);
 
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
locked\n",node->modname, node->fcnname);
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all 
locked\n",node->modname, node->fcnname);
 
        msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0);
 
        _DEBUG_PETRINET_ 
-               mnstr_printf(PNout, "#petrinet.execute %s.%s transition 
done:%s\n",
+               mnstr_printf(GDKout, "#petrinet.execute %s.%s transition 
done:%s\n",
                node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:""));
 
        // empty the baskets according to their policy
@@ -421,7 +421,7 @@ PNexecute( void *n)
                MT_lock_unset(&baskets[node->outputs[j]].lock);
 
        pnet[node->inputs[0]].time += GDKusec() - t;   /* keep around in 
microseconds */
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
unlocked\n",node->modname, node->fcnname);
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all 
unlocked\n",node->modname, node->fcnname);
        node->status = PNWAIT;
 }
 
@@ -437,7 +437,7 @@ PNscheduler(void *dummy)
        char claimed[MAXBSKT];
        timestamp ts, tn;
 
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
+       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.controller started\n");
        cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/
        if( cntxt){
                if( SQLinitClient(cntxt) != MAL_SUCCEED)
@@ -469,7 +469,7 @@ PNscheduler(void *dummy)
                        // check if all input baskets are available and 
non-empty
                        for (j = 0; j < MAXBSKT &&  pnet[i].enabled && 
pnet[i].inputs[j]; j++) {
                                idx = pnet[i].inputs[j];
-                               if (baskets[idx].status != BSKTFILLED ){
+                               if (baskets[idx].status != BSKTFILLED  && 
baskets[idx].heartbeat == 0 ){
                                        pnet[i].enabled = 0;
                                        break;
                                }
@@ -479,6 +479,7 @@ PNscheduler(void *dummy)
                                        (void) MTIMEtimestamp_add(&tn, 
&baskets[idx].seen, &baskets[idx].heartbeat);
                                        if (tn.days < ts.days || (tn.days == 
ts.days && tn.msecs < ts.msecs)) {
                                                pnet[i].enabled = 0;
+                                               PNcycle--; // it does not count 
as a valid cycle.
                                                break;
                                        }
                                } else
@@ -493,13 +494,13 @@ PNscheduler(void *dummy)
                                /* a basket can be used in at most one 
continuous query at a time */
                                for (j = 0; j < MAXBSKT &&  pnet[i].enabled && 
pnet[i].inputs[j]; j++) 
                                        if( claimed[pnet[i].inputs[j]]){
-                                               _DEBUG_PETRINET_ 
mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n", 
pnet[i].modname, pnet[i].fcnname);
+                                               _DEBUG_PETRINET_ 
mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n", 
pnet[i].modname, pnet[i].fcnname);
                                                pnet[i].enabled = 0;
                                                break;
                                        } 
                                for (j = 0; j < MAXBSKT &&  pnet[i].enabled && 
pnet[i].outputs[j]; j++) 
                                        if( claimed[pnet[i].outputs[j]]){
-                                               _DEBUG_PETRINET_ 
mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n", 
pnet[i].modname, pnet[i].fcnname);
+                                               _DEBUG_PETRINET_ 
mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n", 
pnet[i].modname, pnet[i].fcnname);
                                                pnet[i].enabled = 0;
                                                break;
                                        } 
@@ -514,7 +515,7 @@ PNscheduler(void *dummy)
 
                                /*save the ids of all continuous queries that 
can be executed */
                                enabled[k++] = i;
-                               _DEBUG_PETRINET_ mnstr_printf(PNout, 
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
+                               _DEBUG_PETRINET_ mnstr_printf(GDKout, 
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
                        } 
                        pntasks += pnet[i].enabled;
                }
@@ -525,7 +526,7 @@ PNscheduler(void *dummy)
                for (m = 0; m < k; m++) {
                        i = enabled[m];
                        if (pnet[i].enabled ) {
-                               _DEBUG_PETRINET_ mnstr_printf(PNout, "#Run 
transition %s \n", pnet[i].fcnname);
+                               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Run 
transition %s \n", pnet[i].fcnname);
 
                                t = GDKusec();
                                // Fork MAL execution thread 
@@ -553,22 +554,22 @@ PNscheduler(void *dummy)
                        }
                }
                /* after one sweep all threads should be released */
-               for (m = 0; m < k; m++) {
-                       _DEBUG_PETRINET_ mnstr_printf(PNout, "#Terminate query 
thread %s \n", pnet[i].fcnname);
-                       MT_join_thread(pnet[i].tid);
+               for (m = 0; m < k; m++)
+               if(pnet[enabled[m]].tid){
+                       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Terminate query 
thread %s \n", pnet[enabled[m]].fcnname);
+                       MT_join_thread(pnet[enabled[m]].tid);
                }
 
                _DEBUG_PETRINET_ if (pnstatus == PNRUNNING && cycleDelay) 
MT_sleep_ms(cycleDelay);  /* delay to make it more tractable */
                MT_sleep_ms(2);  
                while (pnstatus == PNPAUSED)    { /* scheduler is paused */
                        MT_sleep_ms(cycleDelay);  
-                       _DEBUG_PETRINET_ mnstr_printf(PNout, 
"#petrinet.controller paused\n");
+                       _DEBUG_PETRINET_ mnstr_printf(GDKout, 
"#petrinet.controller paused\n");
                }
 
        }
        MCcloseClient(cntxt);
        pnstatus = PNINIT;
-       _DEBUG_PETRINET_ mnstr_flush(PNout);
        (void) dummy;
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to