Changeset: fd4f14e18be3 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fd4f14e18be3
Modified Files:
        sql/backends/monet5/datacell/50_datacell.sql
        sql/backends/monet5/datacell/Tests/scenario05.sql
        sql/backends/monet5/datacell/basket.mx
        sql/backends/monet5/datacell/datacell.mx
        sql/backends/monet5/datacell/petrinet.mx
Branch: default
Log Message:

Add table inspections
The status of the baskets and queries can be inspected
using SQL.


diffs (truncated from 578 to 300 lines):

diff --git a/sql/backends/monet5/datacell/50_datacell.sql 
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -20,10 +20,6 @@
 create procedure datacell.basket(tbl string)
    external name datacell.basket;
 
-create function datacell.inventory()
-returns table (kind string, nme string)
-   external name datacell.inventory;
-
 create procedure datacell.receptor(tbl string, host string, portid integer)
     external name datacell.receptor;
 
@@ -77,3 +73,14 @@
 create function datacell.beat(bskt string, t integer)
 returns boolean
        external name datacell.beat;
+
+-- Inspection tables
+
+create function datacell.baskets()
+returns table( nme string, threshold int, winsize int, winslide int, beat int,
+       seen timestamp, grabs int, events int)
+external name datacell.baskets;
+
+create function datacell.queries()
+returns table( nme string, status string, cycles int, events int, time bigint, 
error string, def string)
+external name datacell.queries;
diff --git a/sql/backends/monet5/datacell/Tests/scenario05.sql 
b/sql/backends/monet5/datacell/Tests/scenario05.sql
--- a/sql/backends/monet5/datacell/Tests/scenario05.sql
+++ b/sql/backends/monet5/datacell/Tests/scenario05.sql
@@ -1,7 +1,7 @@
 -- Scenario to exercise the datacell implementation
 -- using a single receptor and emitter
 -- this is the extended version of scenario00
--- with sliding beatdow and a 2 seconds delay
+-- with sliding window and a 2 seconds delay
 
 create schema datacell;
 set optimizer='datacell_pipe';
@@ -23,6 +23,9 @@
 call datacell.resume();
 call datacell.dump();
 
+select * from datacell.baskets();
+select * from datacell.queries();
+
 -- externally, activate the sensor 
 --sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1
 -- externally, activate the actuator server to listen
diff --git a/sql/backends/monet5/datacell/basket.mx 
b/sql/backends/monet5/datacell/basket.mx
--- a/sql/backends/monet5/datacell/basket.mx
+++ b/sql/backends/monet5/datacell/basket.mx
@@ -72,6 +72,10 @@
 address BSKTreset
 comment "Remove all baskets";
 
+command baskets():bat[:str,:bat]
+address BSKTtable
+comment "Inspect the datacell baskets";
+
 command dump()
 address BSKTdump
 comment "Dump the status of the basket table";
@@ -105,12 +109,14 @@
        str name;       /* table that represents the basket */
        int threshold ; /* bound to determine scheduling eligibility */
        int winsize, winslide; /* sliding window operations */
-       lng lastseen;
+       lng seen;
        lng beat;       /* milliseconds delay */
        int colcount;
        str *cols;
        BAT **primary;  
        /* statistics */
+       int events; /* total number of events grabbed */
+       int grabs; /* number of grabs */
 } *BSKTbasket, BSKTbasketRec;
 
 datacell_export str schema_default;
@@ -126,6 +132,7 @@
 datacell_export str BSKTthreshold(int *ret, str *tbl, int *sz);
 datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
 datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
+datacell_export str BSKTtable(int *ret);
 
 datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
 datacell_export str BSKTunlock(int *ret, str *tbl);
@@ -150,7 +157,7 @@
 str schema_default = "datacell";
 
 BSKTbasketRec *baskets;   /* the datacell catalog */
-int bsktTop, bsktLimit = 0;
+int bsktTop = 0, bsktLimit = 0;
 static MT_Lock bsktLock;
 
 #define lockBSKTbasketCatalog() mal_set_lock(bsktLock, "basket");
@@ -408,6 +415,7 @@
        str tbl;
        int bskt, i, *ret;
        BAT *b,*bn = 0,*v;
+       int cnt;
 
        (void) cntxt;
        (void) mb;
@@ -435,6 +443,7 @@
                        b = BATsetaccess(b, BAT_WRITE);
                        BATins(b,v, FALSE);
                        BATsetcount(bn, baskets[bskt].winsize);
+                       cnt = (int) BATcount(bn);
                        BBPunfix(v->batCacheid);
                } else {
                        bn = BATcopy(b, b->htype, b->ttype,TRUE);
@@ -449,6 +458,8 @@
                if( bn) BBPunfix( bn->batCacheid);
                throw(MAL,"basket.grab","too early");
        }
+       baskets[bskt].grabs++;
+       baskets[bskt].events += cnt;
        return MAL_SUCCEED;
 }
 
@@ -578,8 +589,89 @@
                throw(MAL,"basket.beat","Illegal value");
        baskets[bskt].beat = *sz;
        *ret = TRUE;
-       if ( baskets[bskt].beat + baskets[bskt].lastseen > GDKusec() )
+       if ( baskets[bskt].beat + baskets[bskt].seen > GDKusec() )
                throw(MAL,"basket.heat","too early");
        return MAL_SUCCEED;
 }
+
+/* provide a tabular view for inspection */
+str
+BSKTtable(int *ret)
+{
+       BAT *bn, *name, *seen, *events, *grabs;
+       BAT *threshold, *winsize, *winslide, *beat;
+       int i;
+
+       bn = BATnew(TYPE_str, TYPE_bat, BATTINY);
+       if ( bn == 0)
+               throw(MAL,"dictionary.baskets",MAL_MALLOC_FAIL);
+
+       name = BATnew(TYPE_oid,TYPE_str, BATTINY);
+       if ( name == 0 ) goto wrapup;
+       BATseqbase(name,0);
+       threshold = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( threshold == 0 ) goto wrapup;
+       BATseqbase(threshold,0);
+       winsize = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( winsize == 0 ) goto wrapup;
+       BATseqbase(winsize,0);
+       winslide = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( winslide == 0 ) goto wrapup;
+       BATseqbase(winslide,0);
+       beat = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( beat == 0 ) goto wrapup;
+       BATseqbase(beat,0);
+       seen = BATnew(TYPE_oid,TYPE_timestamp, BATTINY);
+       if ( seen == 0 ) goto wrapup;
+       BATseqbase(seen,0);
+       grabs = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( grabs == 0 ) goto wrapup;
+       BATseqbase(grabs,0);
+       events = BATnew(TYPE_oid,TYPE_int, BATTINY);
+       if ( events == 0 ) goto wrapup;
+       BATseqbase(events,0);
+
+       for ( i =1; i < bsktTop; i++) 
+       if ( baskets[i].name ) {
+               BUNappend(name, baskets[i].name, FALSE);
+               BUNappend(threshold, &baskets[i].threshold, FALSE);
+               BUNappend(winsize, &baskets[i].winsize, FALSE);
+               BUNappend(winslide, &baskets[i].winslide, FALSE);
+               BUNappend(beat, &baskets[i].beat, FALSE);
+               BUNappend(seen, &baskets[i].seen, FALSE);
+               BUNappend(grabs, &baskets[i].grabs, FALSE);
+               BUNappend(events, &baskets[i].events, FALSE);
+       }
+       BUNins(bn,"nme", & name->batCacheid, FALSE);
+       BUNins(bn,"threshold", & threshold->batCacheid, FALSE);
+       BUNins(bn,"winsize", & winsize->batCacheid, FALSE);
+       BUNins(bn,"winslide", & winslide->batCacheid, FALSE);
+       BUNins(bn,"beat", & beat->batCacheid, FALSE);
+       BUNins(bn,"seen", & seen->batCacheid, FALSE);
+       BUNins(bn,"grabs", & grabs->batCacheid, FALSE);
+       BUNins(bn,"events", & events->batCacheid, FALSE);
+
+       *ret = bn->batCacheid;
+       BBPkeepref(bn->batCacheid);
+       BBPkeepref(name->batCacheid);
+       BBPkeepref(threshold->batCacheid);
+       BBPkeepref(winsize->batCacheid);
+       BBPkeepref(winslide->batCacheid);
+       BBPkeepref(beat->batCacheid);
+       BBPkeepref(seen->batCacheid);
+       BBPkeepref(grabs->batCacheid);
+       BBPkeepref(events->batCacheid);
+       return MAL_SUCCEED;
+wrapup:
+       if ( bn) BBPreleaseref(bn->batCacheid);
+       if ( name) BBPreleaseref(name->batCacheid);
+       if ( threshold) BBPreleaseref(threshold->batCacheid);
+       if ( winsize) BBPreleaseref(winsize->batCacheid);
+       if ( winslide) BBPreleaseref(winslide->batCacheid);
+       if ( beat) BBPreleaseref(beat->batCacheid);
+       if ( seen) BBPreleaseref(seen->batCacheid);
+       if ( grabs) BBPreleaseref(grabs->batCacheid);
+       if ( events) BBPreleaseref(events->batCacheid);
+       throw(MAL,"datacell.baskets",MAL_MALLOC_FAIL);
+}
 @}
diff --git a/sql/backends/monet5/datacell/datacell.mx 
b/sql/backends/monet5/datacell/datacell.mx
--- a/sql/backends/monet5/datacell/datacell.mx
+++ b/sql/backends/monet5/datacell/datacell.mx
@@ -72,10 +72,6 @@
 address DCpostlude
 comment "Stop the petrinet scheduler.";
 
-command inventory():bat[:str,:bat]
-address DCinventory
-comment "Produce a tabular view of the datacell streaming components";
-
 command dump()
 address DCdump
 comment "Dump receptor/emitter status";
@@ -91,6 +87,15 @@
 command beat(bskt:str, t:int):bit
 address DCbeat
 comment "Schedule query with T milliseconds interval";
+
+command baskets():bat[:str,:bat]
+address BSKTtable
+comment "Inspect the datacell baskets";
+
+command queries():bat[:str,:bat]
+address PNtable
+comment "Inspect the datacell queries";
+
 @{
 @h
 #ifndef _DATACELLS_
@@ -137,7 +142,6 @@
 datacell_export str DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
 datacell_export str DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
-datacell_export str DCinventory(int *ret);
 
 #endif
 @c
@@ -322,29 +326,6 @@
        return MAL_SUCCEED;
 }
 
-str
-DCinventory(int *ret)
-{
-       BAT *b, *bn;
-       int i;
-       str msg = MAL_SUCCEED;
-
-       b = BATnew(TYPE_str, TYPE_bat, BATTINY);
-       if (b == NULL)
-               throw(SQL, "datacell.inventory", MAL_MALLOC_FAIL);
-       for (i = 0; msg == MAL_SUCCEED && i < bsktTop; i++) {
-               bn = BATdescriptor(BBPindex(baskets[i].name));
-               if (bn == NULL)
-                       msg = createException(SQL, "datacell.inventory", 
MAL_MALLOC_FAIL);
-               else {
-                       BUNins(b, baskets[i].name, &bn->batCacheid, FALSE);
-                       BBPreleaseref(bn->batCacheid);
-               }
-       }
-       *ret = b->batCacheid;
-       BBPkeepref(b->batCacheid);
-       return msg;
-}
 
 /* locate the MAL representation of this operation and extract the flow */
 /* If the operation is not available yet, it should be compiled from its
diff --git a/sql/backends/monet5/datacell/petrinet.mx 
b/sql/backends/monet5/datacell/petrinet.mx
--- a/sql/backends/monet5/datacell/petrinet.mx
+++ b/sql/backends/monet5/datacell/petrinet.mx
@@ -29,17 +29,16 @@
 @end example
 @mal
 module petrinet;
-pattern register(nme:str, delay:int):void
+pattern register(nme:str, def:str):void
 address PNregister
 comment "Add a continous query to the Petri net. It will analyse
-the MAL block to determine the input/output dependencies. 
-The delay provides an indicative upperbound on how fast to react";
+the MAL block to determine the input/output dependencies. ";
 
-command source(fcn:str, tbl:str):void
+command source(nme:str, tbl:str):void
 address PNsource
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to