Changeset: 30283d4f77ec for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30283d4f77ec
Removed Files:
        sql/backends/monet5/iot/Tests/iot01.sql
Modified Files:
        monetdb5/mal/mal_instruction.c
        monetdb5/optimizer/opt_iot.c
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/Makefile.am
        sql/backends/monet5/iot/Tests/All
        sql/backends/monet5/iot/Tests/cleanup.sql
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/Tests/iot02.sql
        sql/backends/monet5/iot/Tests/iot03.sql
        sql/backends/monet5/iot/Tests/iot10.sql
        sql/backends/monet5/iot/Tests/iot12.sql
        sql/backends/monet5/iot/Tests/iot15.sql
        sql/backends/monet5/iot/Tests/receptor01.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/iot.h
        sql/backends/monet5/iot/iot.mal
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
        sql/backends/monet5/sql_optimizer.c
        sql/backends/monet5/sql_scenario.c
Branch: iot
Log Message:

Merged with backend code


diffs (truncated from 1908 to 300 lines):

diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -207,6 +207,7 @@ freeMalBlk(MalBlkPtr mb)
 
        if (mb->history)
                freeMalBlk(mb->history);
+       mb->history = 0;
        if (mb->binding)
                GDKfree(mb->binding);
        mb->binding = 0;
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -34,7 +34,7 @@
 #include "opt_dataflow.h"
 
 #define MAXBSKT 64
-#define isstream(S,T) \
+#define getStreamTableInfo(S,T) \
        for(fnd=0, k= 0; k< btop; k++) \
        if( strcmp(schemas[k], S)== 0 && strcmp(tables[k], T)== 0 ){ \
                fnd= 1; break;\
@@ -50,13 +50,18 @@ OPTiotImplementation(Client cntxt, MalBl
        str  tables[MAXBSKT];
        int  mvc[MAXBSKT];
        int done[MAXBSKT]= {0};
-       int btop=0;
+       int btop=0, lastmvc=0;
        int noerror=0;
-       int cq;
+       int cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
+       char buf[256];
+       lng usec = GDKusec();
 
        (void) pci;
 
-       cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
+       OPTDEBUGiot {
+               mnstr_printf(cntxt->fdout, "#iot optimizer start\n");
+               printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
+       } 
        old = mb->stmt;
        limit = mb->stop;
        slimit = mb->ssize;
@@ -64,29 +69,27 @@ OPTiotImplementation(Client cntxt, MalBl
        /* first analyse the query for streaming tables */
        for (i = 1; i < limit && btop <MAXBSKT; i++){
                p = old[i];
-               if( getModuleId(p)== basketRef && (getFunctionId(p)== 
registerRef || getFunctionId(p)== bindRef || getFunctionId(p)== clear_tableRef) 
 ){
+               if( getModuleId(p)== basketRef && (getFunctionId(p)== 
registerRef || getFunctionId(p)== bindRef )  ){
                        OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
-                       schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
-                       tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
-                       mvc[btop] = getArg(p,0);
+                       schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+                       tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[j+1])==0  && 
strcmp(tables[j],tables[j+1]) ==0)
                                break;
-                       mvc[j] = getArg(p,0);
+                       lastmvc = mvc[j] = getArg(p,0);
                        done[j]= done[j] || getFunctionId(p)== registerRef;
                        if( j == btop)
                                btop++;
                }
-               if( getModuleId(p)== basketRef && (getFunctionId(p) == 
appendRef || getFunctionId(p) == deleteRef )){
+               if( getModuleId(p)== basketRef && getFunctionId(p) == appendRef 
){
                        OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
                        schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
-                       mvc[btop] = getArg(p,0);
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[j+1])==0  && 
strcmp(tables[j],tables[j+1]) ==0)
                                break;
 
-                       mvc[j] = getArg(p,0);
+                       lastmvc  = mvc[j] = getArg(p,0);
                        if( j == btop)
                                btop++;
                }
@@ -100,12 +103,19 @@ OPTiotImplementation(Client cntxt, MalBl
                        if( j == btop)
                                btop++;
                }
+               if( getModuleId(p)== sqlRef && getFunctionId(p) == appendRef )
+                       lastmvc = getArg(p,0);
+               if (!cq && getModuleId(p) == sqlRef && getFunctionId(p) == 
affectedRowsRef )
+                       lastmvc = getArg(p,0);
+               if( getModuleId(p)== iotRef && getFunctionId(p) == tumbleRef){
+                       lastmvc = getArg(p,1);
+               }
        }
        if( btop == MAXBSKT || btop == 0)
                return 0;
 
        OPTDEBUGiot {
-               mnstr_printf(cntxt->fdout, "#iot optimizer started\n");
+               mnstr_printf(cntxt->fdout, "#iot optimizer started with %d 
streams, mvc %d\n", btop,lastmvc);
                printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
        }
                (void) stk;
@@ -118,13 +128,6 @@ OPTiotImplementation(Client cntxt, MalBl
                return 0;
 
        pushInstruction(mb, old[0]);
-       // register all baskets used
-       for( j=0; j<btop; j++)
-       if( done[j]==0) {
-               p= newStmt(mb,basketRef,registerRef);
-               p= pushStr(mb,p, schemas[j]);
-               p= pushStr(mb,p, tables[j]);
-       }
        for (i = 1; i < limit; i++)
                if (old[i]) {
                        p = old[i];
@@ -133,13 +136,30 @@ OPTiotImplementation(Client cntxt, MalBl
                                freeInstruction(p);
                                continue;
                        }
+                       if(getModuleId(p) == sqlRef && getFunctionId(p)== 
mvcRef){
+                               pushInstruction(mb,p);
+                               k= getArg(p,0);
+                               // register all baskets used
+                               for( j=0; j<btop; j++)
+                               if( done[j]==0) {
+                                       p= newStmt(mb,basketRef,registerRef);
+                                       p= pushArgument(mb,p,k);
+                                       p= pushStr(mb,p, schemas[j]);
+                                       p= pushStr(mb,p, tables[j]);
+                                       alias[k] = getArg(p,0);
+                               }
+                               continue;
+                       }
+                       // register all baskets used after the mvc had been 
determined
                        if (getModuleId(p) == sqlRef && getFunctionId(p) == 
tidRef ){
-                               
isstream(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
+                               
getStreamTableInfo(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
+                               OPTDEBUGiot 
+                                       mnstr_printf(cntxt->fdout, "#iot 
optimizer found stream %d\n",fnd);
                                if( fnd){
                                        alias[getArg(p,0)] = -1;
                                        freeInstruction(p);
+                                       continue;
                                }
-                               continue;
                        }
                        if (getModuleId(p) == algebraRef && getFunctionId(p) == 
projectionRef && alias[getArg(p,1)] < 0){
                                alias[getArg(p,0)] = getArg(p,2);
@@ -148,16 +168,10 @@ OPTiotImplementation(Client cntxt, MalBl
                        }
 
                        if (getModuleId(p) == sqlRef && getFunctionId(p) == 
affectedRowsRef ){
-                               for(j = 0; j < btop; j++){
-                                       r =  newStmt(mb, basketRef, commitRef);
-                                       if (alias[mvc[j]] > 0)
-                                               r =  pushArgument(mb,r, 
alias[mvc[j]]);
-                                       else
-                                               r =  pushArgument(mb,r, mvc[j]);
-                                       r =  pushStr(mb,r, schemas[j]);
-                                       r =  pushStr(mb,r, tables[j]);
-                               }
-                               freeInstruction(p);
+                               if(cq)
+                                       freeInstruction(p);
+                               else 
+                                       pushInstruction(mb,p);
                                continue;
                        }
 
@@ -165,9 +179,17 @@ OPTiotImplementation(Client cntxt, MalBl
                                noerror++;
                        if (p->token == ENDsymbol && btop > 0 && noerror==0) {
                                // empty all baskets used only when we are 
optimizing a cq
-                               for( j=0; cq && j<btop; j++)
+                               for(j = 0; cq && j < btop; j++){
+                                       r =  newStmt(mb, basketRef, tumbleRef);
+                                       r =  pushArgument(mb,r, lastmvc);
+                                       r =  pushStr(mb,r, schemas[j]);
+                                       r =  pushStr(mb,r, tables[j]);
+                               }
+                               /* non-contiguous queries call for releasing 
the lock on the basket */
+                               for( j=0; !cq && j<btop; j++)
                                if( done[j]==0) {
-                                       p= newStmt(mb,iotRef,tumbleRef);
+                                       p= newStmt(mb,basketRef,commitRef);
+                                       p= pushArgument(mb,p, lastmvc);
                                        p= pushStr(mb,p, schemas[j]);
                                        p= pushStr(mb,p, tables[j]);
                                }
@@ -208,7 +230,7 @@ OPTiotImplementation(Client cntxt, MalBl
                                        getArg(p, j) = alias[getArg(p, j)];
 
                        if (getModuleId(p) == sqlRef && getFunctionId(p) == 
appendRef ){
-                               
isstream(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
+                               
//getStreamTableInfo(getVarConstant(mb,getArg(p,3)).val.sval, 
getVarConstant(mb,getArg(p,4)).val.sval );
                                /* the appends come in multiple steps.
                                   The first initializes an basket update 
statement,
                                   which is triggered when we commit the 
transaction.
@@ -226,12 +248,16 @@ OPTiotImplementation(Client cntxt, MalBl
        chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE);
        chkFlow(cntxt->fdout, mb);
        chkDeclarations(cntxt->fdout, mb);
+    /* keep all actions taken as a post block comment */
+    snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec","iot", btop, 
GDKusec() - usec);
+    newComment(mb,buf);
 
        OPTDEBUGiot {
                mnstr_printf(cntxt->fdout, "#iot optimizer final\n");
                printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
        } 
        GDKfree(alias);
+       GDKfree(old);
        return btop > 0;
 }
 
diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -24,17 +24,20 @@ create procedure iot.query(qry string)
 create procedure iot.query("schema" string, name string)
        external name iot.query;
 
-create procedure iot.activate("schema" string, name string)
-       external name iot.activate;
+create procedure iot.resume("schema" string, name string)
+       external name iot.resume;
 
-create procedure iot.activate()
-       external name iot.activate;
+create procedure iot.resume()
+       external name iot.resume;
 
-create procedure iot.deactivate("schema" string, name string)
-       external name iot.deactivate;
+create procedure iot.pause("schema" string, name string)
+       external name iot.pause;
 
-create procedure iot.deactivate()
-       external name iot.deactivate;
+create procedure iot.pause()
+       external name iot.pause;
+
+create procedure iot.wait(cycles integer)
+       external name iot.wait;
 
 create procedure iot.deregister("schema" string, name string)
        external name iot.deregister;
@@ -61,17 +64,10 @@ create procedure iot.emitter("schema" st
        external name iot.emitter;
 
 
-create procedure iot.threshold("schema" string, "table" string, elem int)
-       external name iot.threshold;
-
-create procedure iot.beat("schema" string, "table" string, msec int)
-       external name iot.beat;
+create procedure iot.heartbeat("schema" string, "table" string, msec int)
+       external name iot.heartbeat;
 
 -- cleaup activities 
-create procedure iot.tumble()
-       external name iot.tumble;
-create procedure iot.tumble("schema" string, "table" string)
-       external name iot.tumble;
 create procedure iot.tumble("schema" string, "table" string, elem int)
        external name iot.tumble;
 
@@ -81,11 +77,9 @@ create procedure iot.window("schema" str
 create procedure iot.window("schema" string, "table" string, elem int, slide 
int)
        external name iot.window;
 
-create procedure iot.wait() external name iot.wait;
-
 -- Inspection tables
 create function iot.baskets()
-returns table( "schema" string, "table" string, "status" string, threshold 
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen 
timestamp, events int)
+returns table( "schema" string, "table" string, "status" string, threshold 
int, winsize int, winstride int, timeslice int, timestride int, heartbeat int, 
seen timestamp, events int)
 external name iot.baskets;
 
 create function iot.queries()
diff --git a/sql/backends/monet5/iot/Makefile.am 
b/sql/backends/monet5/iot/Makefile.am
--- a/sql/backends/monet5/iot/Makefile.am
+++ b/sql/backends/monet5/iot/Makefile.am
@@ -13,9 +13,9 @@ install-exec-local-50_iot.mal: 50_iot.ma
 uninstall-local-50_iot.mal: 
        $(RM) $(DESTDIR)$(libdir)/monetdb5/autoload/50_iot.mal
 
-iot.o iot.lo: iot.c iot.h ../../../../monetdb5/mal/../../gdk/gdk.h 
../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h 
../sql.h ../sql_scenario.h basket.h petrinet.h 
../../../../monetdb5/optimizer/opt_prelude.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_pipes.h 
../../../../monetdb5/optimizer/opt_iot.h ../sql_optimizer.h ../sql_gencode.h
-petrinet.o petrinet.lo: petrinet.c iot.h 
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h 
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h 
petrinet.h ../../../../monetdb5/mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_prelude.h
-basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h 
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h 
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h 
petrinet.h ../../../../monetdb5/mal/mal_instruction.h 
../../../../monetdb5/mal/mal_type.h ../../../../monetdb5/mal/mal_stack.h 
../../../../monetdb5/mal/mal_namespace.h ../../../../monetdb5/mal/mal_errors.h 
../../../../monetdb5/mal/mal_exception.h ../../../../monetdb5/mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_prelude.h
+iot.o iot.lo: iot.c ../sql_optimizer.h ../sql_gencode.h 
../../../../monetdb5/optimizer/opt_prelude.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_pipes.h 
../../../../monetdb5/optimizer/opt_iot.h basket.h 
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h 
../../../../monetdb5/mal/mal_interpreter.h ../sql.h iot.h ../sql_scenario.h 
petrinet.h
+petrinet.o petrinet.lo: petrinet.c iot.h 
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h 
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h 
petrinet.h basket.h ../../../../monetdb5/mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_prelude.h
+basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h 
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h 
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h 
../../../../monetdb5/mal/mal_instruction.h ../../../../monetdb5/mal/mal_type.h 
../../../../monetdb5/mal/mal_stack.h ../../../../monetdb5/mal/mal_namespace.h 
../../../../monetdb5/mal/mal_errors.h ../../../../monetdb5/mal/mal_exception.h 
../../../../monetdb5/mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_support.h 
../../../../monetdb5/optimizer/../mal/mal.h 
../../../../monetdb5/optimizer/../mal/mal_function.h 
../../../../monetdb5/optimizer/../mal/mal_scenario.h 
../../../../monetdb5/optimizer/../mal/mal_builder.h 
../../../../monetdb5/optimizer/opt_prelude.h
 install-exec-local-basket.mal: basket.mal
        -mkdir -p $(DESTDIR)$(libdir)/monetdb5
        -$(RM) $(DESTDIR)$(libdir)/monetdb5/basket.mal
@@ -53,11 +53,11 @@ lib_iot_la_CFLAGS=-DLIBIOT $(AM_CFLAGS)
 iotdir = $(libdir)/monetdb5
 lib_iot_la_LIBADD = ../../../../monetdb5/tools/libmonetdb5.la 
../../../../gdk/libbat.la
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to