Changeset: 63082c33014d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=63082c33014d Modified Files: sql/backends/monet5/datacell/50_datacell.sql sql/backends/monet5/datacell/Makefile.ag sql/backends/monet5/datacell/datacell.mx sql/backends/monet5/datacell/opt_datacell.mx sql/backends/monet5/datacell/petrinet.mx sql/backends/monet5/datacell/receptor.mx sql/backends/monet5/datacell/sensor.mx Branch: default Log Message:
Receptors and scheduler The events are collected from the channel and retained in the basket. The datacell compiler takes the continuous query expressed as ordinary SQL and compiles it into a plan over baskets, i.e. tables living in the datacell schema. The petrinet scheduler organises the queries in a dependency graph and will execute them (as MAL functions) upon arrival of events. diffs (truncated from 870 to 300 lines): diff --git a/sql/backends/monet5/datacell/50_datacell.sql b/sql/backends/monet5/datacell/50_datacell.sql --- a/sql/backends/monet5/datacell/50_datacell.sql +++ b/sql/backends/monet5/datacell/50_datacell.sql @@ -48,11 +48,21 @@ create procedure datacell.query(sch string, proc string) external name datacell.query; +create procedure datacell.register(sch string, proc string) + external name datacell.register; + +-- scheduler activation create procedure datacell.prepare() external name datacell.prepare; create procedure datacell.finish() external name datacell.finish; +create procedure datacell.pause() + external name datacell.pause; + +create procedure datacell.resume() + external name datacell.resume; + create procedure datacell.dump() external name datacell.dump; diff --git a/sql/backends/monet5/datacell/Makefile.ag b/sql/backends/monet5/datacell/Makefile.ag --- a/sql/backends/monet5/datacell/Makefile.ag +++ b/sql/backends/monet5/datacell/Makefile.ag @@ -40,6 +40,7 @@ receptor.mx \ petrinet.mx \ dcoperator.mx \ + opt_datacell.mx \ datacell.mx \ emitter.mx @@ -72,6 +73,7 @@ emitter.mal \ petrinet.mal \ datacell.mal \ + opt_datacell.mal \ dcoperator.mal } diff --git a/sql/backends/monet5/datacell/datacell.mx b/sql/backends/monet5/datacell/datacell.mx --- a/sql/backends/monet5/datacell/datacell.mx +++ b/sql/backends/monet5/datacell/datacell.mx @@ -60,6 +60,15 @@ address DCquery comment "Add a new continuous query."; +pattern register(name:str,def:str, delay:int):void +address PNregistration +comment "Add a new continuous query to the scheduler with an +indicative maximum delady of scheduling."; + +pattern register(name:str,def:str):void +address PNregistration +comment "Add a new continuous query to the scheduler."; + pattern start() address DCstartScheduler comment "Convert the datacell schema to a stream processing infrastructure"; @@ -130,6 +139,9 @@ #endif @c #include "datacell.h" +#include "opt_datacell.h" +#include "sql_optimizer.h" + #ifdef WIN32 #include "winsock2.h" #endif @@ -260,8 +272,12 @@ str msg; InstrPtr p; Module scope; + lng clk = GDKusec(); (void) mb; + /* check if the argument denotes a procedure name */ + /* if so, get its definition to be compiled */ + msg = SQLstatementIntern(cntxt, def, nme, 0, 0); /* if ( msg ) return msg; */ @@ -275,7 +291,18 @@ setModuleId(p, putName("datacell",8)); setFunctionId(p, putName(nme,strlen(nme))); insertSymbol(scope,s); - PNanalysis(cntxt,s->def); + /* printFunction(cntxt->fdout, s->def, 0, LIST_MAL_STMT);*/ + /* optimize the code and register at scheduler */ + if ( msg == MAL_SUCCEED) { + OPTdatacellImplementation(cntxt,s->def,0,0); + addOptimizers(cntxt,s->def,0); + if ( msg == MAL_SUCCEED) + msg = optimizeMALBlock(cntxt,s->def); + if ( msg == MAL_SUCCEED) + msg = optimizerCheck(cntxt, s->def, "optimizer.datacell", 1, GDKusec() - clk, OPT_CHECK_ALL); + addtoMalBlkHistory(mb, "datacell"); + } + printFunction(cntxt->fdout, s->def, 0, LIST_MAL_STMT); return msg; } @@ -332,8 +359,6 @@ DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { int ret=0; - if ( DCinitialized != DCPAUSED) - throw(MAL,"datacell.pause","Datacell already running"); PNresumeScheduler(&ret); (void) cntxt; (void) mb; @@ -377,6 +402,7 @@ BSKTdump(ret); RCdump(); EMdump(); + PNdump(ret); return MAL_SUCCEED; } @} diff --git a/sql/backends/monet5/datacell/opt_datacell.mx b/sql/backends/monet5/datacell/opt_datacell.mx --- a/sql/backends/monet5/datacell/opt_datacell.mx +++ b/sql/backends/monet5/datacell/opt_datacell.mx @@ -39,6 +39,7 @@ #define OPTDEBUGdatacell if (optDebug & (1 << DEBUG_OPT_DATACELL)) opt_export str OPTdatacell(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); +opt_export int OPTdatacellImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); #endif @- @@ -49,14 +50,23 @@ #include "opt_deadcode.h" #include "mal_interpreter.h" /* for showErrors() */ #include "mal_builder.h" +#include "basket.h" +#include "sql_optimizer.h" #include "opt_statistics.h" +#include "opt_dataflow.h" -static int +int OPTdatacellImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - int actions = 0; - int i, j,limit, vlimit, slimit; + int actions = 0, fnd; + int bskt, i, j, k, limit, vlimit, slimit; InstrPtr p, *old; + str schema, table; + int maxbasket= 32, m=0; + char *schemas[32], *tables[32]; + InstrPtr q[32]; + lng clk,t; + int *alias; (void) pci; @@ -72,31 +82,82 @@ if ( newMalBlkStmt(mb, slimit) < 0) return 0; - for (i = 0; i < limit; i++) { + alias = (int*) GDKzalloc(mb->vtop * 2 * sizeof(int)); + if ( alias == 0) + return 0; + removeDataflow(old,limit); + + for (i = 0; i < limit; i++) + if( old[i]) { p = old[i]; - if ( getModuleId(p) == sqlRef ){ - if( getFunctionId(p ) == bindRef && getVarConstant(mb, getArg(p, p->argc-1)).val.ival == 0) { - /* only the primary BAT is used */ - pushInstruction(mb, p); + if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef ||getFunctionId(p) == binddbatRef)){ + schema = getVarConstant(mb, getArg(p,2)).val.sval; + table = getVarConstant(mb, getArg(p,3)).val.sval; + bskt = BSKTlocate(schema,table); + if ( bskt ){ + for( j =0; j< m; j++) + if ( strcmp(schema,schemas[j])== 0 && strcmp(table,tables[j])==0) + break; + if ( j == m ){ + if ( m == maxbasket) return 0; + /* grab the basket tables by swapping the BATs used for the SQL table */ + q[m]= BSKTgrabInstruction(mb,schema,table); + schemas[m]= schema; + tables[m++]= table; + actions = 1; + } + } + + fnd = 0; + if ( getFunctionId(p) == bindRef && getVarConstant(mb, getArg(p, p->argc-1)).val.ival == 0 ){ + /* only the primary BAT is used and inject the call to empty the basket. */ + for( j = 0; fnd == 0 && j < baskets[bskt].colcount; j++) + if ( strcmp( baskets[bskt].cols[j], getVarConstant(mb, getArg(p,4)).val.sval) == 0){ + for( k =0; k< m; k++) + if ( strcmp(schema,schemas[k])== 0 && strcmp(table,tables[k])==0) + break; + alias[getArg(p,0)] = getArg(q[k],j); + fnd = 1; + break; + } + if ( fnd == 0) + pushInstruction(mb,p); + else + freeInstruction(p); continue; } - /* zap all expression arguments */ - clrFunction(p); - p->argc= p->retc; - for ( j=0; j< p->retc; j++ ) - p = pushArgument(mb, p, newTmpVariable(mb, getArgType(mb,p,j)) ); + if ( bskt) { + wrd rows = 0; + ValRecord vr; + /* zap all expression arguments */ + clrFunction(p); + p->argc= p->retc; + for ( j=0; j< p->retc; j++ ) + p = pushNil(mb,p,getArgType(mb,p,j)); + varSetProp(mb, p->argv[p->argc-1], rowsProp, op_eq, VALset(&vr, TYPE_wrd, &rows)); + } } + for ( j=0; j< p->argc; j++) + if( alias[getArg(p,j)] ) + getArg(p,j) = alias[getArg(p,j)]; pushInstruction(mb, p); } (void)stk; (void)pci; if (actions) + { addPipeDefinition("datacell_pipe", "inline,remap,datacell,evaluate,costModel,coercions,emptySet,aliases,mitosis," "mergetable,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow," "history,multiplex,accumulators,garbageCollector"); - return 0; + /* extend the plan with the new optimizer pipe required */ + clk= GDKusec(); + optimizerCheck(cntxt, mb, "optimizer.datacell", 1, t = (GDKusec() - clk), OPT_CHECK_ALL); + addtoMalBlkHistory(mb, "datacell"); + } + GDKfree(alias); + return actions; } /* #define _DEBUG_OPTIMIZER_*/ @@ -145,13 +206,17 @@ return MAL_SUCCEED; } actions = OPTdatacellImplementation(cntxt, mb, stk, p); - msg = optimizerCheck(cntxt, mb, "optimizer.datacell", actions, t = (GDKusec() - clk), OPT_CHECK_ALL); + addOptimizers(cntxt,mb,0); + if ( msg == MAL_SUCCEED) + msg = optimizeMALBlock(cntxt,mb); + if ( msg == MAL_SUCCEED) + msg = optimizerCheck(cntxt, mb, "optimizer.datacell", actions, t = (GDKusec() - clk), OPT_CHECK_ALL); OPTDEBUGdatacell { mnstr_printf(cntxt->fdout, "=FINISHED datacell %d\n", actions); printFunction(cntxt->fdout, mb, 0, LIST_MAL_STMT | LIST_MAPI); } DEBUGoptimizers - mnstr_printf(cntxt->fdout, "#opt_reduce: %d ms\n", t); + mnstr_printf(cntxt->fdout, "#opt_reduce: %d ms\n", t); QOTupdateStatistics("datacell", actions, t); addtoMalBlkHistory(mb, "datacell"); return msg; diff --git a/sql/backends/monet5/datacell/petrinet.mx b/sql/backends/monet5/datacell/petrinet.mx --- a/sql/backends/monet5/datacell/petrinet.mx +++ b/sql/backends/monet5/datacell/petrinet.mx @@ -32,7 +32,8 @@ pattern registration(mod:str,fcn:str, delay:int):void address PNregistration comment "Add a continous query to the Petri net. It will analyse -the MAL block to determine the input/output dependencies."; +the MAL block to determine the input/output dependencies. +The delay provides an indicative upperbound on how fast to react"; command source(mod:str,fcn:str, schema:str,tbl:str):void address PNsource @@ -68,9 +69,10 @@ #define _PETRINET_ #include "monetdb_config.h" #include "mal_interpreter.h" +#include "sql_scenario.h" #include "basket.h" -/* #define _DEBUG_PETRINET_*/ +/* #define _DEBUG_PETRINET_ */ #define PNout GDKout /*#define _BASKET_SIZE_*/ _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list