Changeset: fd4f14e18be3 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fd4f14e18be3 Modified Files: sql/backends/monet5/datacell/50_datacell.sql sql/backends/monet5/datacell/Tests/scenario05.sql sql/backends/monet5/datacell/basket.mx sql/backends/monet5/datacell/datacell.mx sql/backends/monet5/datacell/petrinet.mx Branch: default Log Message:
Add table inspections The status of the baskets and queries can be inspected using SQL. diffs (truncated from 578 to 300 lines): diff --git a/sql/backends/monet5/datacell/50_datacell.sql b/sql/backends/monet5/datacell/50_datacell.sql --- a/sql/backends/monet5/datacell/50_datacell.sql +++ b/sql/backends/monet5/datacell/50_datacell.sql @@ -20,10 +20,6 @@ create procedure datacell.basket(tbl string) external name datacell.basket; -create function datacell.inventory() -returns table (kind string, nme string) - external name datacell.inventory; - create procedure datacell.receptor(tbl string, host string, portid integer) external name datacell.receptor; @@ -77,3 +73,14 @@ create function datacell.beat(bskt string, t integer) returns boolean external name datacell.beat; + +-- Inspection tables + +create function datacell.baskets() +returns table( nme string, threshold int, winsize int, winslide int, beat int, + seen timestamp, grabs int, events int) +external name datacell.baskets; + +create function datacell.queries() +returns table( nme string, status string, cycles int, events int, time bigint, error string, def string) +external name datacell.queries; diff --git a/sql/backends/monet5/datacell/Tests/scenario05.sql b/sql/backends/monet5/datacell/Tests/scenario05.sql --- a/sql/backends/monet5/datacell/Tests/scenario05.sql +++ b/sql/backends/monet5/datacell/Tests/scenario05.sql @@ -1,7 +1,7 @@ -- Scenario to exercise the datacell implementation -- using a single receptor and emitter -- this is the extended version of scenario00 --- with sliding beatdow and a 2 seconds delay +-- with sliding window and a 2 seconds delay create schema datacell; set optimizer='datacell_pipe'; @@ -23,6 +23,9 @@ call datacell.resume(); call datacell.dump(); +select * from datacell.baskets(); +select * from datacell.queries(); + -- externally, activate the sensor --sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1 -- externally, activate the actuator server to listen diff --git a/sql/backends/monet5/datacell/basket.mx b/sql/backends/monet5/datacell/basket.mx --- a/sql/backends/monet5/datacell/basket.mx +++ b/sql/backends/monet5/datacell/basket.mx @@ -72,6 +72,10 @@ address BSKTreset comment "Remove all baskets"; +command baskets():bat[:str,:bat] +address BSKTtable +comment "Inspect the datacell baskets"; + command dump() address BSKTdump comment "Dump the status of the basket table"; @@ -105,12 +109,14 @@ str name; /* table that represents the basket */ int threshold ; /* bound to determine scheduling eligibility */ int winsize, winslide; /* sliding window operations */ - lng lastseen; + lng seen; lng beat; /* milliseconds delay */ int colcount; str *cols; BAT **primary; /* statistics */ + int events; /* total number of events grabbed */ + int grabs; /* number of grabs */ } *BSKTbasket, BSKTbasketRec; datacell_export str schema_default; @@ -126,6 +132,7 @@ datacell_export str BSKTthreshold(int *ret, str *tbl, int *sz); datacell_export str BSKTbeat(int *ret, str *tbl, int *sz); datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide); +datacell_export str BSKTtable(int *ret); datacell_export str BSKTlock(int *ret, str *tbl, int *delay); datacell_export str BSKTunlock(int *ret, str *tbl); @@ -150,7 +157,7 @@ str schema_default = "datacell"; BSKTbasketRec *baskets; /* the datacell catalog */ -int bsktTop, bsktLimit = 0; +int bsktTop = 0, bsktLimit = 0; static MT_Lock bsktLock; #define lockBSKTbasketCatalog() mal_set_lock(bsktLock, "basket"); @@ -408,6 +415,7 @@ str tbl; int bskt, i, *ret; BAT *b,*bn = 0,*v; + int cnt; (void) cntxt; (void) mb; @@ -435,6 +443,7 @@ b = BATsetaccess(b, BAT_WRITE); BATins(b,v, FALSE); BATsetcount(bn, baskets[bskt].winsize); + cnt = (int) BATcount(bn); BBPunfix(v->batCacheid); } else { bn = BATcopy(b, b->htype, b->ttype,TRUE); @@ -449,6 +458,8 @@ if( bn) BBPunfix( bn->batCacheid); throw(MAL,"basket.grab","too early"); } + baskets[bskt].grabs++; + baskets[bskt].events += cnt; return MAL_SUCCEED; } @@ -578,8 +589,89 @@ throw(MAL,"basket.beat","Illegal value"); baskets[bskt].beat = *sz; *ret = TRUE; - if ( baskets[bskt].beat + baskets[bskt].lastseen > GDKusec() ) + if ( baskets[bskt].beat + baskets[bskt].seen > GDKusec() ) throw(MAL,"basket.heat","too early"); return MAL_SUCCEED; } + +/* provide a tabular view for inspection */ +str +BSKTtable(int *ret) +{ + BAT *bn, *name, *seen, *events, *grabs; + BAT *threshold, *winsize, *winslide, *beat; + int i; + + bn = BATnew(TYPE_str, TYPE_bat, BATTINY); + if ( bn == 0) + throw(MAL,"dictionary.baskets",MAL_MALLOC_FAIL); + + name = BATnew(TYPE_oid,TYPE_str, BATTINY); + if ( name == 0 ) goto wrapup; + BATseqbase(name,0); + threshold = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( threshold == 0 ) goto wrapup; + BATseqbase(threshold,0); + winsize = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( winsize == 0 ) goto wrapup; + BATseqbase(winsize,0); + winslide = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( winslide == 0 ) goto wrapup; + BATseqbase(winslide,0); + beat = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( beat == 0 ) goto wrapup; + BATseqbase(beat,0); + seen = BATnew(TYPE_oid,TYPE_timestamp, BATTINY); + if ( seen == 0 ) goto wrapup; + BATseqbase(seen,0); + grabs = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( grabs == 0 ) goto wrapup; + BATseqbase(grabs,0); + events = BATnew(TYPE_oid,TYPE_int, BATTINY); + if ( events == 0 ) goto wrapup; + BATseqbase(events,0); + + for ( i =1; i < bsktTop; i++) + if ( baskets[i].name ) { + BUNappend(name, baskets[i].name, FALSE); + BUNappend(threshold, &baskets[i].threshold, FALSE); + BUNappend(winsize, &baskets[i].winsize, FALSE); + BUNappend(winslide, &baskets[i].winslide, FALSE); + BUNappend(beat, &baskets[i].beat, FALSE); + BUNappend(seen, &baskets[i].seen, FALSE); + BUNappend(grabs, &baskets[i].grabs, FALSE); + BUNappend(events, &baskets[i].events, FALSE); + } + BUNins(bn,"nme", & name->batCacheid, FALSE); + BUNins(bn,"threshold", & threshold->batCacheid, FALSE); + BUNins(bn,"winsize", & winsize->batCacheid, FALSE); + BUNins(bn,"winslide", & winslide->batCacheid, FALSE); + BUNins(bn,"beat", & beat->batCacheid, FALSE); + BUNins(bn,"seen", & seen->batCacheid, FALSE); + BUNins(bn,"grabs", & grabs->batCacheid, FALSE); + BUNins(bn,"events", & events->batCacheid, FALSE); + + *ret = bn->batCacheid; + BBPkeepref(bn->batCacheid); + BBPkeepref(name->batCacheid); + BBPkeepref(threshold->batCacheid); + BBPkeepref(winsize->batCacheid); + BBPkeepref(winslide->batCacheid); + BBPkeepref(beat->batCacheid); + BBPkeepref(seen->batCacheid); + BBPkeepref(grabs->batCacheid); + BBPkeepref(events->batCacheid); + return MAL_SUCCEED; +wrapup: + if ( bn) BBPreleaseref(bn->batCacheid); + if ( name) BBPreleaseref(name->batCacheid); + if ( threshold) BBPreleaseref(threshold->batCacheid); + if ( winsize) BBPreleaseref(winsize->batCacheid); + if ( winslide) BBPreleaseref(winslide->batCacheid); + if ( beat) BBPreleaseref(beat->batCacheid); + if ( seen) BBPreleaseref(seen->batCacheid); + if ( grabs) BBPreleaseref(grabs->batCacheid); + if ( events) BBPreleaseref(events->batCacheid); + throw(MAL,"datacell.baskets",MAL_MALLOC_FAIL); +} @} diff --git a/sql/backends/monet5/datacell/datacell.mx b/sql/backends/monet5/datacell/datacell.mx --- a/sql/backends/monet5/datacell/datacell.mx +++ b/sql/backends/monet5/datacell/datacell.mx @@ -72,10 +72,6 @@ address DCpostlude comment "Stop the petrinet scheduler."; -command inventory():bat[:str,:bat] -address DCinventory -comment "Produce a tabular view of the datacell streaming components"; - command dump() address DCdump comment "Dump receptor/emitter status"; @@ -91,6 +87,15 @@ command beat(bskt:str, t:int):bit address DCbeat comment "Schedule query with T milliseconds interval"; + +command baskets():bat[:str,:bat] +address BSKTtable +comment "Inspect the datacell baskets"; + +command queries():bat[:str,:bat] +address PNtable +comment "Inspect the datacell queries"; + @{ @h #ifndef _DATACELLS_ @@ -137,7 +142,6 @@ datacell_export str DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); datacell_export str DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); -datacell_export str DCinventory(int *ret); #endif @c @@ -322,29 +326,6 @@ return MAL_SUCCEED; } -str -DCinventory(int *ret) -{ - BAT *b, *bn; - int i; - str msg = MAL_SUCCEED; - - b = BATnew(TYPE_str, TYPE_bat, BATTINY); - if (b == NULL) - throw(SQL, "datacell.inventory", MAL_MALLOC_FAIL); - for (i = 0; msg == MAL_SUCCEED && i < bsktTop; i++) { - bn = BATdescriptor(BBPindex(baskets[i].name)); - if (bn == NULL) - msg = createException(SQL, "datacell.inventory", MAL_MALLOC_FAIL); - else { - BUNins(b, baskets[i].name, &bn->batCacheid, FALSE); - BBPreleaseref(bn->batCacheid); - } - } - *ret = b->batCacheid; - BBPkeepref(b->batCacheid); - return msg; -} /* locate the MAL representation of this operation and extract the flow */ /* If the operation is not available yet, it should be compiled from its diff --git a/sql/backends/monet5/datacell/petrinet.mx b/sql/backends/monet5/datacell/petrinet.mx --- a/sql/backends/monet5/datacell/petrinet.mx +++ b/sql/backends/monet5/datacell/petrinet.mx @@ -29,17 +29,16 @@ @end example @mal module petrinet; -pattern register(nme:str, delay:int):void +pattern register(nme:str, def:str):void address PNregister comment "Add a continous query to the Petri net. It will analyse -the MAL block to determine the input/output dependencies. -The delay provides an indicative upperbound on how fast to react"; +the MAL block to determine the input/output dependencies. "; -command source(fcn:str, tbl:str):void +command source(nme:str, tbl:str):void address PNsource _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list