Changeset: 30283d4f77ec for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30283d4f77ec Removed Files: sql/backends/monet5/iot/Tests/iot01.sql Modified Files: monetdb5/mal/mal_instruction.c monetdb5/optimizer/opt_iot.c sql/backends/monet5/iot/50_iot.sql sql/backends/monet5/iot/Makefile.am sql/backends/monet5/iot/Tests/All sql/backends/monet5/iot/Tests/cleanup.sql sql/backends/monet5/iot/Tests/iot00.sql sql/backends/monet5/iot/Tests/iot02.sql sql/backends/monet5/iot/Tests/iot03.sql sql/backends/monet5/iot/Tests/iot10.sql sql/backends/monet5/iot/Tests/iot12.sql sql/backends/monet5/iot/Tests/iot15.sql sql/backends/monet5/iot/Tests/receptor01.sql sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/basket.mal sql/backends/monet5/iot/iot.c sql/backends/monet5/iot/iot.h sql/backends/monet5/iot/iot.mal sql/backends/monet5/iot/petrinet.c sql/backends/monet5/iot/petrinet.h sql/backends/monet5/iot/petrinet.mal sql/backends/monet5/sql_optimizer.c sql/backends/monet5/sql_scenario.c Branch: iot Log Message:
Merged with backend code diffs (truncated from 1908 to 300 lines): diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c --- a/monetdb5/mal/mal_instruction.c +++ b/monetdb5/mal/mal_instruction.c @@ -207,6 +207,7 @@ freeMalBlk(MalBlkPtr mb) if (mb->history) freeMalBlk(mb->history); + mb->history = 0; if (mb->binding) GDKfree(mb->binding); mb->binding = 0; diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c --- a/monetdb5/optimizer/opt_iot.c +++ b/monetdb5/optimizer/opt_iot.c @@ -34,7 +34,7 @@ #include "opt_dataflow.h" #define MAXBSKT 64 -#define isstream(S,T) \ +#define getStreamTableInfo(S,T) \ for(fnd=0, k= 0; k< btop; k++) \ if( strcmp(schemas[k], S)== 0 && strcmp(tables[k], T)== 0 ){ \ fnd= 1; break;\ @@ -50,13 +50,18 @@ OPTiotImplementation(Client cntxt, MalBl str tables[MAXBSKT]; int mvc[MAXBSKT]; int done[MAXBSKT]= {0}; - int btop=0; + int btop=0, lastmvc=0; int noerror=0; - int cq; + int cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0; + char buf[256]; + lng usec = GDKusec(); (void) pci; - cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0; + OPTDEBUGiot { + mnstr_printf(cntxt->fdout, "#iot optimizer start\n"); + printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG); + } old = mb->stmt; limit = mb->stop; slimit = mb->ssize; @@ -64,29 +69,27 @@ OPTiotImplementation(Client cntxt, MalBl /* first analyse the query for streaming tables */ for (i = 1; i < limit && btop <MAXBSKT; i++){ p = old[i]; - if( getModuleId(p)== basketRef && (getFunctionId(p)== registerRef || getFunctionId(p)== bindRef || getFunctionId(p)== clear_tableRef) ){ + if( getModuleId(p)== basketRef && (getFunctionId(p)== registerRef || getFunctionId(p)== bindRef ) ){ OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream table %s.%s\n", getModuleId(p), getFunctionId(p)); - schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval; - tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval; - mvc[btop] = getArg(p,0); + schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval; + tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval; for( j =0; j< btop ; j++) if( strcmp(schemas[j], schemas[j+1])==0 && strcmp(tables[j],tables[j+1]) ==0) break; - mvc[j] = getArg(p,0); + lastmvc = mvc[j] = getArg(p,0); done[j]= done[j] || getFunctionId(p)== registerRef; if( j == btop) btop++; } - if( getModuleId(p)== basketRef && (getFunctionId(p) == appendRef || getFunctionId(p) == deleteRef )){ + if( getModuleId(p)== basketRef && getFunctionId(p) == appendRef ){ OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream table %s.%s\n", getModuleId(p), getFunctionId(p)); schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval; tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval; - mvc[btop] = getArg(p,0); for( j =0; j< btop ; j++) if( strcmp(schemas[j], schemas[j+1])==0 && strcmp(tables[j],tables[j+1]) ==0) break; - mvc[j] = getArg(p,0); + lastmvc = mvc[j] = getArg(p,0); if( j == btop) btop++; } @@ -100,12 +103,19 @@ OPTiotImplementation(Client cntxt, MalBl if( j == btop) btop++; } + if( getModuleId(p)== sqlRef && getFunctionId(p) == appendRef ) + lastmvc = getArg(p,0); + if (!cq && getModuleId(p) == sqlRef && getFunctionId(p) == affectedRowsRef ) + lastmvc = getArg(p,0); + if( getModuleId(p)== iotRef && getFunctionId(p) == tumbleRef){ + lastmvc = getArg(p,1); + } } if( btop == MAXBSKT || btop == 0) return 0; OPTDEBUGiot { - mnstr_printf(cntxt->fdout, "#iot optimizer started\n"); + mnstr_printf(cntxt->fdout, "#iot optimizer started with %d streams, mvc %d\n", btop,lastmvc); printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG); } (void) stk; @@ -118,13 +128,6 @@ OPTiotImplementation(Client cntxt, MalBl return 0; pushInstruction(mb, old[0]); - // register all baskets used - for( j=0; j<btop; j++) - if( done[j]==0) { - p= newStmt(mb,basketRef,registerRef); - p= pushStr(mb,p, schemas[j]); - p= pushStr(mb,p, tables[j]); - } for (i = 1; i < limit; i++) if (old[i]) { p = old[i]; @@ -133,13 +136,30 @@ OPTiotImplementation(Client cntxt, MalBl freeInstruction(p); continue; } + if(getModuleId(p) == sqlRef && getFunctionId(p)== mvcRef){ + pushInstruction(mb,p); + k= getArg(p,0); + // register all baskets used + for( j=0; j<btop; j++) + if( done[j]==0) { + p= newStmt(mb,basketRef,registerRef); + p= pushArgument(mb,p,k); + p= pushStr(mb,p, schemas[j]); + p= pushStr(mb,p, tables[j]); + alias[k] = getArg(p,0); + } + continue; + } + // register all baskets used after the mvc had been determined if (getModuleId(p) == sqlRef && getFunctionId(p) == tidRef ){ - isstream(getVarConstant(mb,getArg(p,2)).val.sval, getVarConstant(mb,getArg(p,3)).val.sval ); + getStreamTableInfo(getVarConstant(mb,getArg(p,2)).val.sval, getVarConstant(mb,getArg(p,3)).val.sval ); + OPTDEBUGiot + mnstr_printf(cntxt->fdout, "#iot optimizer found stream %d\n",fnd); if( fnd){ alias[getArg(p,0)] = -1; freeInstruction(p); + continue; } - continue; } if (getModuleId(p) == algebraRef && getFunctionId(p) == projectionRef && alias[getArg(p,1)] < 0){ alias[getArg(p,0)] = getArg(p,2); @@ -148,16 +168,10 @@ OPTiotImplementation(Client cntxt, MalBl } if (getModuleId(p) == sqlRef && getFunctionId(p) == affectedRowsRef ){ - for(j = 0; j < btop; j++){ - r = newStmt(mb, basketRef, commitRef); - if (alias[mvc[j]] > 0) - r = pushArgument(mb,r, alias[mvc[j]]); - else - r = pushArgument(mb,r, mvc[j]); - r = pushStr(mb,r, schemas[j]); - r = pushStr(mb,r, tables[j]); - } - freeInstruction(p); + if(cq) + freeInstruction(p); + else + pushInstruction(mb,p); continue; } @@ -165,9 +179,17 @@ OPTiotImplementation(Client cntxt, MalBl noerror++; if (p->token == ENDsymbol && btop > 0 && noerror==0) { // empty all baskets used only when we are optimizing a cq - for( j=0; cq && j<btop; j++) + for(j = 0; cq && j < btop; j++){ + r = newStmt(mb, basketRef, tumbleRef); + r = pushArgument(mb,r, lastmvc); + r = pushStr(mb,r, schemas[j]); + r = pushStr(mb,r, tables[j]); + } + /* non-contiguous queries call for releasing the lock on the basket */ + for( j=0; !cq && j<btop; j++) if( done[j]==0) { - p= newStmt(mb,iotRef,tumbleRef); + p= newStmt(mb,basketRef,commitRef); + p= pushArgument(mb,p, lastmvc); p= pushStr(mb,p, schemas[j]); p= pushStr(mb,p, tables[j]); } @@ -208,7 +230,7 @@ OPTiotImplementation(Client cntxt, MalBl getArg(p, j) = alias[getArg(p, j)]; if (getModuleId(p) == sqlRef && getFunctionId(p) == appendRef ){ - isstream(getVarConstant(mb,getArg(p,2)).val.sval, getVarConstant(mb,getArg(p,3)).val.sval ); + //getStreamTableInfo(getVarConstant(mb,getArg(p,3)).val.sval, getVarConstant(mb,getArg(p,4)).val.sval ); /* the appends come in multiple steps. The first initializes an basket update statement, which is triggered when we commit the transaction. @@ -226,12 +248,16 @@ OPTiotImplementation(Client cntxt, MalBl chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE); chkFlow(cntxt->fdout, mb); chkDeclarations(cntxt->fdout, mb); + /* keep all actions taken as a post block comment */ + snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec","iot", btop, GDKusec() - usec); + newComment(mb,buf); OPTDEBUGiot { mnstr_printf(cntxt->fdout, "#iot optimizer final\n"); printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG); } GDKfree(alias); + GDKfree(old); return btop > 0; } diff --git a/sql/backends/monet5/iot/50_iot.sql b/sql/backends/monet5/iot/50_iot.sql --- a/sql/backends/monet5/iot/50_iot.sql +++ b/sql/backends/monet5/iot/50_iot.sql @@ -24,17 +24,20 @@ create procedure iot.query(qry string) create procedure iot.query("schema" string, name string) external name iot.query; -create procedure iot.activate("schema" string, name string) - external name iot.activate; +create procedure iot.resume("schema" string, name string) + external name iot.resume; -create procedure iot.activate() - external name iot.activate; +create procedure iot.resume() + external name iot.resume; -create procedure iot.deactivate("schema" string, name string) - external name iot.deactivate; +create procedure iot.pause("schema" string, name string) + external name iot.pause; -create procedure iot.deactivate() - external name iot.deactivate; +create procedure iot.pause() + external name iot.pause; + +create procedure iot.wait(cycles integer) + external name iot.wait; create procedure iot.deregister("schema" string, name string) external name iot.deregister; @@ -61,17 +64,10 @@ create procedure iot.emitter("schema" st external name iot.emitter; -create procedure iot.threshold("schema" string, "table" string, elem int) - external name iot.threshold; - -create procedure iot.beat("schema" string, "table" string, msec int) - external name iot.beat; +create procedure iot.heartbeat("schema" string, "table" string, msec int) + external name iot.heartbeat; -- cleaup activities -create procedure iot.tumble() - external name iot.tumble; -create procedure iot.tumble("schema" string, "table" string) - external name iot.tumble; create procedure iot.tumble("schema" string, "table" string, elem int) external name iot.tumble; @@ -81,11 +77,9 @@ create procedure iot.window("schema" str create procedure iot.window("schema" string, "table" string, elem int, slide int) external name iot.window; -create procedure iot.wait() external name iot.wait; - -- Inspection tables create function iot.baskets() -returns table( "schema" string, "table" string, "status" string, threshold int, winsize int, winstride int, timeslice int, timestride int, beat int, seen timestamp, events int) +returns table( "schema" string, "table" string, "status" string, threshold int, winsize int, winstride int, timeslice int, timestride int, heartbeat int, seen timestamp, events int) external name iot.baskets; create function iot.queries() diff --git a/sql/backends/monet5/iot/Makefile.am b/sql/backends/monet5/iot/Makefile.am --- a/sql/backends/monet5/iot/Makefile.am +++ b/sql/backends/monet5/iot/Makefile.am @@ -13,9 +13,9 @@ install-exec-local-50_iot.mal: 50_iot.ma uninstall-local-50_iot.mal: $(RM) $(DESTDIR)$(libdir)/monetdb5/autoload/50_iot.mal -iot.o iot.lo: iot.c iot.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h petrinet.h ../../../../monetdb5/optimizer/opt_prelude.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_pipes.h ../../../../monetdb5/optimizer/opt_iot.h ../sql_optimizer.h ../sql_gencode.h -petrinet.o petrinet.lo: petrinet.c iot.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h petrinet.h ../../../../monetdb5/mal/mal_builder.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_prelude.h -basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h petrinet.h ../../../../monetdb5/mal/mal_instruction.h ../../../../monetdb5/mal/mal_type.h ../../../../monetdb5/mal/mal_stack.h ../../../../monetdb5/mal/mal_namespace.h ../../../../monetdb5/mal/mal_errors.h ../../../../monetdb5/mal/mal_exception.h ../../../../monetdb5/mal/mal_builder.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_prelude.h +iot.o iot.lo: iot.c ../sql_optimizer.h ../sql_gencode.h ../../../../monetdb5/optimizer/opt_prelude.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_pipes.h ../../../../monetdb5/optimizer/opt_iot.h basket.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h iot.h ../sql_scenario.h petrinet.h +petrinet.o petrinet.lo: petrinet.c iot.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h petrinet.h basket.h ../../../../monetdb5/mal/mal_builder.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_prelude.h +basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h ../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h ../../../../monetdb5/mal/mal_instruction.h ../../../../monetdb5/mal/mal_type.h ../../../../monetdb5/mal/mal_stack.h ../../../../monetdb5/mal/mal_namespace.h ../../../../monetdb5/mal/mal_errors.h ../../../../monetdb5/mal/mal_exception.h ../../../../monetdb5/mal/mal_builder.h ../../../../monetdb5/optimizer/opt_support.h ../../../../monetdb5/optimizer/../mal/mal.h ../../../../monetdb5/optimizer/../mal/mal_function.h ../../../../monetdb5/optimizer/../mal/mal_scenario.h ../../../../monetdb5/optimizer/../mal/mal_builder.h ../../../../monetdb5/optimizer/opt_prelude.h install-exec-local-basket.mal: basket.mal -mkdir -p $(DESTDIR)$(libdir)/monetdb5 -$(RM) $(DESTDIR)$(libdir)/monetdb5/basket.mal @@ -53,11 +53,11 @@ lib_iot_la_CFLAGS=-DLIBIOT $(AM_CFLAGS) iotdir = $(libdir)/monetdb5 lib_iot_la_LIBADD = ../../../../monetdb5/tools/libmonetdb5.la ../../../../gdk/libbat.la _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list