Changeset: 63082c33014d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=63082c33014d
Modified Files:
        sql/backends/monet5/datacell/50_datacell.sql
        sql/backends/monet5/datacell/Makefile.ag
        sql/backends/monet5/datacell/datacell.mx
        sql/backends/monet5/datacell/opt_datacell.mx
        sql/backends/monet5/datacell/petrinet.mx
        sql/backends/monet5/datacell/receptor.mx
        sql/backends/monet5/datacell/sensor.mx
Branch: default
Log Message:

Receptors and scheduler
The events are collected from the channel and retained in the basket.
The datacell compiler takes the continuous query expressed as ordinary
SQL and compiles it into a plan over baskets, i.e. tables living
in the datacell schema.
The petrinet scheduler organises the queries in a dependency graph
and will execute them (as MAL functions) upon arrival of events.


diffs (truncated from 870 to 300 lines):

diff --git a/sql/backends/monet5/datacell/50_datacell.sql 
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -48,11 +48,21 @@
 create procedure datacell.query(sch string, proc string)
        external name datacell.query;
 
+create procedure datacell.register(sch string, proc string)
+       external name datacell.register;
+
+-- scheduler activation
 create procedure datacell.prepare()
        external name datacell.prepare;
 
 create procedure datacell.finish()
        external name datacell.finish;
 
+create procedure datacell.pause()
+       external name datacell.pause;
+
+create procedure datacell.resume()
+       external name datacell.resume;
+
 create procedure datacell.dump()
        external name datacell.dump;
diff --git a/sql/backends/monet5/datacell/Makefile.ag 
b/sql/backends/monet5/datacell/Makefile.ag
--- a/sql/backends/monet5/datacell/Makefile.ag
+++ b/sql/backends/monet5/datacell/Makefile.ag
@@ -40,6 +40,7 @@
                  receptor.mx \
                  petrinet.mx \
                  dcoperator.mx \
+                 opt_datacell.mx \
                  datacell.mx \
                  emitter.mx 
 
@@ -72,6 +73,7 @@
                  emitter.mal \
                  petrinet.mal \
                  datacell.mal \
+                 opt_datacell.mal \
                  dcoperator.mal
 }
 
diff --git a/sql/backends/monet5/datacell/datacell.mx 
b/sql/backends/monet5/datacell/datacell.mx
--- a/sql/backends/monet5/datacell/datacell.mx
+++ b/sql/backends/monet5/datacell/datacell.mx
@@ -60,6 +60,15 @@
 address DCquery
 comment "Add a new continuous query.";
 
+pattern register(name:str,def:str, delay:int):void
+address PNregistration
+comment "Add a new continuous query to the scheduler with an
+indicative maximum delady of scheduling.";
+
+pattern register(name:str,def:str):void
+address PNregistration
+comment "Add a new continuous query to the scheduler.";
+
 pattern start()
 address DCstartScheduler
 comment "Convert the datacell schema to a stream processing infrastructure";
@@ -130,6 +139,9 @@
 #endif
 @c
 #include "datacell.h"
+#include "opt_datacell.h"
+#include "sql_optimizer.h"
+
 #ifdef WIN32
 #include "winsock2.h"
 #endif
@@ -260,8 +272,12 @@
        str msg;
        InstrPtr p;
        Module scope;
+       lng clk = GDKusec();
 
        (void) mb;
+       /* check if the argument denotes a procedure name */
+       /* if so, get its definition to be compiled */
+
        msg = SQLstatementIntern(cntxt, def, nme, 0, 0);
        /* if ( msg )
                return msg; */
@@ -275,7 +291,18 @@
        setModuleId(p, putName("datacell",8));
        setFunctionId(p, putName(nme,strlen(nme)));
     insertSymbol(scope,s);
-       PNanalysis(cntxt,s->def);
+       /* printFunction(cntxt->fdout, s->def, 0, LIST_MAL_STMT);*/
+       /* optimize the code and register at scheduler */
+       if ( msg == MAL_SUCCEED) {
+               OPTdatacellImplementation(cntxt,s->def,0,0);
+               addOptimizers(cntxt,s->def,0);
+               if ( msg == MAL_SUCCEED)
+                       msg = optimizeMALBlock(cntxt,s->def);
+               if ( msg == MAL_SUCCEED)
+                       msg = optimizerCheck(cntxt, s->def, 
"optimizer.datacell", 1, GDKusec() - clk, OPT_CHECK_ALL);
+               addtoMalBlkHistory(mb, "datacell");
+       }
+       printFunction(cntxt->fdout, s->def, 0, LIST_MAL_STMT);
        return msg;
 }
 
@@ -332,8 +359,6 @@
 DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        int ret=0;
-       if ( DCinitialized != DCPAUSED)
-               throw(MAL,"datacell.pause","Datacell already running");
        PNresumeScheduler(&ret);
        (void) cntxt;
        (void) mb;
@@ -377,6 +402,7 @@
        BSKTdump(ret);
        RCdump();
        EMdump();
+       PNdump(ret);
        return MAL_SUCCEED;
 }
 @}
diff --git a/sql/backends/monet5/datacell/opt_datacell.mx 
b/sql/backends/monet5/datacell/opt_datacell.mx
--- a/sql/backends/monet5/datacell/opt_datacell.mx
+++ b/sql/backends/monet5/datacell/opt_datacell.mx
@@ -39,6 +39,7 @@
 
 #define OPTDEBUGdatacell  if (optDebug & (1 << DEBUG_OPT_DATACELL))
 opt_export str OPTdatacell(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
p);
+opt_export int OPTdatacellImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
 
 #endif
 @-
@@ -49,14 +50,23 @@
 #include "opt_deadcode.h"
 #include "mal_interpreter.h"   /* for showErrors() */
 #include "mal_builder.h"
+#include "basket.h"
+#include "sql_optimizer.h"
 #include "opt_statistics.h"
+#include "opt_dataflow.h"
 
-static int
+int
 OPTdatacellImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci)
 {
-       int actions = 0;
-    int i, j,limit, vlimit, slimit;
+       int actions = 0, fnd;
+    int bskt, i, j, k, limit, vlimit, slimit;
     InstrPtr p, *old;
+       str schema, table;
+       int maxbasket= 32, m=0;
+       char *schemas[32], *tables[32];
+       InstrPtr q[32];
+       lng clk,t;
+       int *alias;
 
        (void) pci;
 
@@ -72,31 +82,82 @@
     if ( newMalBlkStmt(mb, slimit) < 0) 
         return 0;
 
-    for (i = 0; i < limit; i++) {
+       alias = (int*) GDKzalloc(mb->vtop * 2 * sizeof(int));
+       if ( alias == 0)
+               return 0;
+       removeDataflow(old,limit);
+
+    for (i = 0; i < limit; i++) 
+       if( old[i]) {
         p = old[i];
-               if ( getModuleId(p) == sqlRef ){
-                       if( getFunctionId(p ) == bindRef  && getVarConstant(mb, 
getArg(p, p->argc-1)).val.ival == 0) {
-                               /* only the primary BAT is used */
-                               pushInstruction(mb, p);
+               if ( getModuleId(p) == sqlRef  && (getFunctionId(p) == bindRef 
||getFunctionId(p) == binddbatRef)){
+                       schema = getVarConstant(mb, getArg(p,2)).val.sval;
+                       table = getVarConstant(mb, getArg(p,3)).val.sval;
+                       bskt = BSKTlocate(schema,table);
+                       if ( bskt ){
+                               for( j =0; j< m; j++)
+                                       if ( strcmp(schema,schemas[j])== 0 && 
strcmp(table,tables[j])==0)
+                                               break;
+                               if ( j == m ){
+                                       if ( m == maxbasket) return 0;
+                                       /* grab the basket tables by swapping 
the BATs used for the SQL table */
+                                       q[m]= 
BSKTgrabInstruction(mb,schema,table);
+                                       schemas[m]= schema;
+                                       tables[m++]= table;
+                                       actions = 1;
+                               }
+                       }
+
+                       fnd = 0;
+                       if (  getFunctionId(p) == bindRef && getVarConstant(mb, 
getArg(p, p->argc-1)).val.ival == 0  ){
+                               /* only the primary BAT is used and inject the 
call to empty the basket. */
+                               for( j = 0; fnd == 0 && j < 
baskets[bskt].colcount; j++)
+                               if ( strcmp( baskets[bskt].cols[j], 
getVarConstant(mb, getArg(p,4)).val.sval) == 0){
+                                       for( k =0; k< m; k++)
+                                               if ( 
strcmp(schema,schemas[k])== 0 && strcmp(table,tables[k])==0)
+                                                       break;
+                                       alias[getArg(p,0)] = getArg(q[k],j);
+                                       fnd = 1;
+                                       break;
+                               }       
+                               if ( fnd == 0)
+                                       pushInstruction(mb,p);
+                               else
+                                       freeInstruction(p);
                                continue;
                        } 
-                       /* zap all expression arguments */
-                       clrFunction(p);
-                       p->argc= p->retc;
-                       for ( j=0; j< p->retc; j++ )
-                               p = pushArgument(mb, p, newTmpVariable(mb, 
getArgType(mb,p,j)) );
+                       if ( bskt) {
+                               wrd rows = 0;
+                               ValRecord vr;
+                               /* zap all expression arguments */
+                               clrFunction(p);
+                               p->argc= p->retc;
+                               for ( j=0; j< p->retc; j++ )
+                                       p = pushNil(mb,p,getArgType(mb,p,j));
+                               varSetProp(mb, p->argv[p->argc-1], rowsProp, 
op_eq, VALset(&vr, TYPE_wrd, &rows));
+                       }
                } 
+               for ( j=0; j< p->argc; j++)
+               if( alias[getArg(p,j)] )
+                       getArg(p,j) = alias[getArg(p,j)];
                pushInstruction(mb, p);
        }
        (void)stk;
        (void)pci;
 
        if (actions)
+       {
                addPipeDefinition("datacell_pipe",
                        
"inline,remap,datacell,evaluate,costModel,coercions,emptySet,aliases,mitosis,"
                        
"mergetable,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,"
                        "history,multiplex,accumulators,garbageCollector");
-       return 0;
+               /* extend the plan with the new optimizer pipe required */
+               clk= GDKusec();
+               optimizerCheck(cntxt, mb, "optimizer.datacell", 1, t = 
(GDKusec() - clk), OPT_CHECK_ALL);
+               addtoMalBlkHistory(mb, "datacell");
+       }
+       GDKfree(alias);
+       return actions;
 }
 
 /* #define _DEBUG_OPTIMIZER_*/
@@ -145,13 +206,17 @@
                return MAL_SUCCEED;
        }
        actions = OPTdatacellImplementation(cntxt, mb, stk, p);
-       msg = optimizerCheck(cntxt, mb, "optimizer.datacell", actions, t = 
(GDKusec() - clk), OPT_CHECK_ALL);
+       addOptimizers(cntxt,mb,0);
+       if ( msg == MAL_SUCCEED)
+               msg = optimizeMALBlock(cntxt,mb);
+       if ( msg == MAL_SUCCEED)
+               msg = optimizerCheck(cntxt, mb, "optimizer.datacell", actions, 
t = (GDKusec() - clk), OPT_CHECK_ALL);
        OPTDEBUGdatacell {
                mnstr_printf(cntxt->fdout, "=FINISHED datacell %d\n", actions);
                printFunction(cntxt->fdout, mb, 0, LIST_MAL_STMT | LIST_MAPI);
        }
        DEBUGoptimizers
-       mnstr_printf(cntxt->fdout, "#opt_reduce: %d ms\n", t);
+               mnstr_printf(cntxt->fdout, "#opt_reduce: %d ms\n", t);
        QOTupdateStatistics("datacell", actions, t);
        addtoMalBlkHistory(mb, "datacell");
        return msg;
diff --git a/sql/backends/monet5/datacell/petrinet.mx 
b/sql/backends/monet5/datacell/petrinet.mx
--- a/sql/backends/monet5/datacell/petrinet.mx
+++ b/sql/backends/monet5/datacell/petrinet.mx
@@ -32,7 +32,8 @@
 pattern registration(mod:str,fcn:str, delay:int):void
 address PNregistration
 comment "Add a continous query to the Petri net. It will analyse
-the MAL block to determine the input/output dependencies.";
+the MAL block to determine the input/output dependencies. 
+The delay provides an indicative upperbound on how fast to react";
 
 command source(mod:str,fcn:str, schema:str,tbl:str):void
 address PNsource
@@ -68,9 +69,10 @@
 #define _PETRINET_
 #include "monetdb_config.h"
 #include "mal_interpreter.h"
+#include "sql_scenario.h"
 #include "basket.h"
 
-/* #define _DEBUG_PETRINET_*/
+/* #define _DEBUG_PETRINET_ */
 
 #define PNout GDKout 
 /*#define  _BASKET_SIZE_*/
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to