Changeset: 77eb40fb1e56 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=77eb40fb1e56 Modified Files: sql/backends/monet5/iot/50_iot.sql sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/Tests/iot05.sql sql/backends/monet5/iot/Tests/iot10.sql sql/backends/monet5/iot/Tests/iot99.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.mal sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/iot.h sql/backends/monet5/iot/iot.mal sql/backends/monet5/iot/petrinet.c sql/backends/monet5/iot/petrinet.h sql/backends/monet5/iot/petrinet.mal sql/backends/monet5/sql_optimizer.c Branch: iot Log Message:
Intermittent commit diffs (truncated from 411 to 300 lines): diff --git a/sql/backends/monet5/iot/50_iot.sql b/sql/backends/monet5/iot/50_iot.sql --- a/sql/backends/monet5/iot/50_iot.sql +++ b/sql/backends/monet5/iot/50_iot.sql @@ -17,49 +17,47 @@ create schema iot; +-- register and start a continuous query create procedure iot.query(qry string) external name iot.query; create procedure iot.query("schema" string, name string) external name iot.query; +-- pause the processing of a continuous query +create procedure iot.pause ("schema" string, name string) + external name iot.pause; + create procedure iot.pause () external name iot.pause; -create procedure iot.pause ("schema" string, name string) - external name iot.pause; +-- resume the processing of a continuous query +create procedure iot.resume ("schema" string, name string) + external name iot.resume; create procedure iot.resume () external name iot.resume; -create procedure iot.resume ("schema" string, name string) - external name iot.resume; +-- resume with limited the number of scheduler before next pause +create procedure iot.cycles(n integer) + external name iot.cycles; + +-- stop and remove a continuous query +create procedure iot.stop ("schema" string, name string) + external name iot.stop; create procedure iot.stop () external name iot.stop; -create procedure iot."drop" () - external name iot."drop"; - -create procedure iot.stop ("schema" string, name string) - external name iot.stop; - -create procedure iot.dump() - external name iot.dump; - -create procedure iot.petrinet() - external name iot.petrinet; - - -- Inspection tables --- create function iot.baskets() --- returns table( "schema" string, "table" string, threshold int, winsize int, winstride int, timeslice int, timestride int, beat int, seen timestamp, events int) --- external name iot.baskets; --- --- create function iot.queries() --- returns table( "schema" string, "function" string, status string, lastrun timestamp, cycles int, events int, time bigint, error string) --- external name petrinet.queries; +create function iot.baskets() +returns table( "schema" string, "table" string, threshold int, winsize int, winstride int, timeslice int, timestride int, beat int, seen timestamp, events int) +external name iot.baskets; + +create function iot.queries() + returns table( "schema" string, "function" string, status string, lastrun timestamp, cycles int, events int, time bigint, error string) + external name iot.queries; -- create function iot.errors() -- returns table( "schema" string, "table" string, error string) diff --git a/sql/backends/monet5/iot/Tests/iot00.sql b/sql/backends/monet5/iot/Tests/iot00.sql --- a/sql/backends/monet5/iot/Tests/iot00.sql +++ b/sql/backends/monet5/iot/Tests/iot00.sql @@ -16,6 +16,4 @@ call iot.query('insert into result selec call iot.baskets(); call iot.petrinet(); -call iot.dump(); -call iot.drop(); -call iot.dump(); +call iot.stop(); diff --git a/sql/backends/monet5/iot/Tests/iot05.sql b/sql/backends/monet5/iot/Tests/iot05.sql --- a/sql/backends/monet5/iot/Tests/iot05.sql +++ b/sql/backends/monet5/iot/Tests/iot05.sql @@ -34,7 +34,4 @@ call iot.query('select 1;'); call iot.baskets(); call iot.scheduler(); -call iot.resume(); call iot.stop(); -call iot.drop(); -call iot.dump(); diff --git a/sql/backends/monet5/iot/Tests/iot10.sql b/sql/backends/monet5/iot/Tests/iot10.sql --- a/sql/backends/monet5/iot/Tests/iot10.sql +++ b/sql/backends/monet5/iot/Tests/iot10.sql @@ -13,9 +13,7 @@ begin tmp_count = tmp_total + (select count(*) from iot.stream_tmp); end; --- alternative is a simple query -iot.pause(); iot.query('iot','collector'); -iot.dump(); -iot.resume(); +iot.baskets(); +iot.queries(); iot.stop(); diff --git a/sql/backends/monet5/iot/Tests/iot99.sql b/sql/backends/monet5/iot/Tests/iot99.sql --- a/sql/backends/monet5/iot/Tests/iot99.sql +++ b/sql/backends/monet5/iot/Tests/iot99.sql @@ -2,8 +2,8 @@ set schema iot; set optimizer='iot_pipe'; -call iot.receptor(); drop procedure clk1; drop procedure clk3; drop procedure collector; +drop table result; drop table stream_tmp; diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -203,8 +203,8 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal *ret = 0; idx= BSKTlocate(sch,tbl); - if (idx <0) - throw(SQL,"iot.bind","Stream table not registered"); + if (idx <= 0) + throw(SQL,"iot.bind","Stream table '%s.%s' not registered",sch,tbl); for(i=0; i < baskets[idx].count; i++) if( strcmp(baskets[idx].cols[i], col)== 0 ){ @@ -225,7 +225,7 @@ str BSKTlock(void *ret, str *sch, str *t int bskt; bskt = BSKTlocate(*sch, *tbl); - if (bskt == 0) + if (bskt <= 0) throw(SQL, "basket.lock", "Could not find the basket %s.%s",*sch,*tbl); #ifdef _DEBUG_BASKET stream_printf(BSKTout, "lock group %s.%s\n", *sch, *tbl); diff --git a/sql/backends/monet5/iot/basket.mal b/sql/backends/monet5/iot/basket.mal --- a/sql/backends/monet5/iot/basket.mal +++ b/sql/backends/monet5/iot/basket.mal @@ -69,7 +69,7 @@ command reset():void address BSKTreset comment "Remove all baskets"; -command baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int], +command iot.baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int], timestride:bat[:int], beat:bat[:int], seen:bat[:timestamp], events:bat[:int]) address BSKTtable comment "Inspect the iot baskets"; diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c --- a/sql/backends/monet5/iot/iot.c +++ b/sql/backends/monet5/iot/iot.c @@ -76,7 +76,7 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal InstrPtr p; Module scope; lng clk = GDKusec(); - char buf[BUFSIZ]; + char buf[BUFSIZ], name[IDLENGTH]; static int iotquerycnt=0; @@ -91,26 +91,28 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal nme = *getArgReference_str(stk, pci, 2); /* check existing of the pre-compiled function */ _DEBUG_IOT_ fprintf(stderr,"#iot: locate a SQL procedure %s.%s()\n",sch,nme); - msg = IOTprocedureStmt(cntxt, mb, sch, nme); - if (msg) - return msg; - s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme))); - if (s == NULL) - throw(SQL, "iot.query", "Definition missing"); - qry = s->def; } else if (pci->argc == 2){ + // pre-create the new procedure sch = "iot"; - snprintf(buf,BUFSIZ,"iot_%d",iotquerycnt++); - nme = buf; + snprintf(name, IDLENGTH,"cquery_%d",iotquerycnt++); def = *getArgReference_str(stk, pci, 1); - _DEBUG_IOT_ fprintf(stderr,"#iot: compile a compound expression %s()\n",def); // package it as a procedure in the current schema [todo] + snprintf(buf,BUFSIZ,"create procedure %s() begin %s; end",name,def); + _DEBUG_IOT_ fprintf(stderr,"#iot.compile: %s\n",buf); + nme = name; msg = SQLstatementIntern(cntxt, &def, nme, 1, 0, 0); if (msg) return msg; - qry = cntxt->curprg->def; } + msg = IOTprocedureStmt(cntxt, mb, sch, nme); + if (msg) + return msg; + s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme))); + if (s == NULL) + throw(SQL, "iot.query", "Definition missing"); + qry = s->def; + _DEBUG_IOT_ fprintf(stderr,"#iot: bake a new continuous query plan\n"); scope = findModule(cntxt->nspace, putName(sch, strlen(sch))); s = newFunction(putName(sch, strlen(sch)), putName(nme, strlen(nme)), FUNCTIONsymbol); @@ -156,15 +158,16 @@ IOTstop(Client cntxt, MalBlkPtr mb, MalS } str -IOTstep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +IOTcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - return PNstep(cntxt,mb,stk,pci); + return PNcycles(cntxt,mb,stk,pci); } - str -IOTdump(void *ret) +IOTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - BSKTdump(ret); - return PNdump(ret); + (void) cntxt; + (void) mb; + (void) stk; + (void) pci; + return MAL_SUCCEED; } - diff --git a/sql/backends/monet5/iot/iot.h b/sql/backends/monet5/iot/iot.h --- a/sql/backends/monet5/iot/iot.h +++ b/sql/backends/monet5/iot/iot.h @@ -44,6 +44,7 @@ iot_export str IOTquery(Client cntxt, Ma iot_export str IOTpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str IOTresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str IOTstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); -iot_export str IOTstep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +iot_export str IOTcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +iot_export str IOTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str IOTdump(void *ret); #endif diff --git a/sql/backends/monet5/iot/iot.mal b/sql/backends/monet5/iot/iot.mal --- a/sql/backends/monet5/iot/iot.mal +++ b/sql/backends/monet5/iot/iot.mal @@ -25,25 +25,25 @@ pattern query(name:str,def:str) address IOTquery comment "Add a new continuous query."; -pattern pause(nme:str):void +pattern pause(sch:str,nme:str):void address IOTpause comment "Pause a specific query"; -pattern resume(nme:str):void +pattern resume(sch:str,nme:str):void address IOTresume comment "(Re)start a query"; -pattern stop(nme:str):void +pattern stop(sch:str,nme:str):void address IOTstop -comment "Remove a query"; +comment "Remove a continuous query"; pattern pause() address IOTpause -comment "Pause all queries"; +comment "Pause all continuous queries"; pattern resume() address IOTresume -comment "Resume all queries."; +comment "Resume all continuous queries."; pattern stop():void address IOTstop @@ -57,15 +57,7 @@ pattern grab(dir:str):void address IOTgrab comment "Grab a directory with the binary files"; -command dump() -address IOTdump -comment "Dump iot status "; - -command queries()(nme:bat[:str], status:bat[:str], seen:bat[:timestamp], cycles:bat[:int], events:bat[:int], time:bat[:lng], error:bat[:str], def:bat[:str]) -address PNtable -comment "Return a table with queries registered"; - command errors()(nme:bat[:str],error:bat[:str]) address BSKTtableerrors -comment "Return a table the erroneous events"; +comment "Return a table the erroneous events found during query processing"; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list