Changeset: b5cd1b3dadf7 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b5cd1b3dadf7 Modified Files: monetdb5/optimizer/opt_prelude.c monetdb5/optimizer/opt_prelude.h sql/backends/monet5/cquery.mal sql/backends/monet5/sql_cat.c sql/backends/monet5/sql_cat.h sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_cquery.h sql/backends/monet5/sql_execute.c sql/backends/monet5/sql_scenario.c sql/backends/monet5/sql_statement.c sql/backends/monet5/sqlcatalog.mal sql/include/sql_catalog.h sql/server/rel_psm.c sql/server/sql_mvc.h sql/server/sql_qc.c sql/server/sql_qc.h Branch: trails Log Message:
Major change. Removed CQ related variables from sql_mvc used during the compilation process. The use of these variables would cause problems while compiling UDFs with CQ calls. CQ registering is now properly compiled into a SQL statement. diffs (truncated from 707 to 300 lines): diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c --- a/monetdb5/optimizer/opt_prelude.c +++ b/monetdb5/optimizer/opt_prelude.c @@ -262,6 +262,7 @@ str sqlcatalogRef; str sqlRef; str startRef; str starttraceRef; +str start_cpRef; str stopRef; str stoptraceRef; str streamsRef; @@ -567,6 +568,7 @@ void optimizerInit(void) streamsRef = putName("streams"); startRef = putName("start"); starttraceRef = putName("starttrace"); + start_cpRef = putName("start_cp"); stopRef = putName("stop"); stoptraceRef = putName("stoptrace"); strRef = putName("str"); diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h --- a/monetdb5/optimizer/opt_prelude.h +++ b/monetdb5/optimizer/opt_prelude.h @@ -261,6 +261,7 @@ mal_export str sqlcatalogRef; mal_export str sqlRef; mal_export str startRef; mal_export str starttraceRef; +mal_export str start_cpRef; mal_export str stopRef; mal_export str stoptraceRef; mal_export str streamsRef; diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal --- a/sql/backends/monet5/cquery.mal +++ b/sql/backends/monet5/cquery.mal @@ -17,11 +17,6 @@ module cquery; -pattern register(mod:str, fcn:str) -address CQregister -comment "Add a continuous SQL procedure to the Petri-net scheduler. It will analyse -the MAL block to determine the input/output dependencies and firing conditions."; - pattern wait(cnt:int) address CQwait comment "Sleep for some time"; diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c --- a/sql/backends/monet5/sql_cat.c +++ b/sql/backends/monet5/sql_cat.c @@ -427,6 +427,12 @@ drop_index(Client cntxt, mvc *sql, char } static str +start_cp(Client cntxt, str alias, int action, lng heartbeat, lng startat, int cycles, MalBlkPtr fcall) +{ + return CQregister(cntxt, alias, action, heartbeat, startat, cycles, fcall); +} + +static str change_single_cp(str alias, int action, lng heartbeat, lng startat, int cycles) { if(action & mod_resume_continuous) { @@ -1300,7 +1306,23 @@ SQLdrop_index(Client cntxt, MalBlkPtr mb msg = drop_index(cntxt, sql, sname, iname); return msg; } -//alias:str, action:int, heartbeats:lng, startat:lng, cycles:int) + +str +SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ mvc *sql = NULL; + str msg; + str alias = *getArgReference_str(stk, pci, 1); + int action = *getArgReference_int(stk, pci, 2); + lng heartbeat = *getArgReference_lng(stk, pci, 3); + lng startat = *getArgReference_lng(stk, pci, 4); + int cycles = *getArgReference_int(stk, pci, 5); + MalBlkPtr fcall = *(MalBlkPtr*) getArgReference(stk, pci, 6); + + initcontext(); + msg = start_cp(cntxt, alias, action, heartbeat, startat, cycles, fcall); + return msg; +} + str SQLchange_single_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { mvc *sql = NULL; diff --git a/sql/backends/monet5/sql_cat.h b/sql/backends/monet5/sql_cat.h --- a/sql/backends/monet5/sql_cat.h +++ b/sql/backends/monet5/sql_cat.h @@ -56,6 +56,7 @@ sql5_export str SQLrename_user(Client cn sql5_export str SQLcreate_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLdrop_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLdrop_index(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; +sql5_export str SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str SQLchange_single_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str SQLchange_all_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -434,39 +434,14 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i * The actual function is called with the arguments provided in the call. */ str -CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci ) +CQregister(Client cntxt, str alias, int which, lng heartbeats, lng startat, int cycles, MalBlkPtr mb) { - str msg = MAL_SUCCEED, alias = NULL; - InstrPtr sig = getInstrPtr(mb,0),q; - MalBlkPtr other; + str msg = MAL_SUCCEED; + InstrPtr sig, q; Symbol s; CQnode *pnew; - backend *be = (backend *) cntxt->sqlcontext; - mvc* sqlcontext; - const char* err_message = "procedure"; - int i, j, is_function = 0, cycles = DEFAULT_CP_CYCLES, idx; - size_t ttlen = 0; - lng heartbeats = DEFAULT_CP_HEARTBEAT, startat = 0; - - (void) stk; - (void) pci; - - if(be){ - sqlcontext = be->mvc; - if(sqlcontext->continuous & mod_continuous_function) - err_message = "function"; - cycles = sqlcontext->cycles; - startat = sqlcontext->startat; - heartbeats = sqlcontext->heartbeats; - is_function = (sqlcontext->continuous & mod_continuous_function); - if(sqlcontext->cq_alias) { - alias = GDKstrdup(sqlcontext->cq_alias); - if( alias == NULL) { - msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); - goto finish; - } - } - } + const char* err_message = (which & mod_continuous_function) ? "function" : "procedure"; + int i, j, idx; if(cycles < 0 && cycles != CYCLES_NIL){ msg = createException(SQL,"cquery.register",SQLSTATE(42000) "The cycles value must be non negative\n"); @@ -481,7 +456,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M goto finish; } - if(is_function){ /* for functions we need to remove the sql.mvc instruction */ + if(which & mod_continuous_function){ /* for functions we need to remove the sql.mvc instruction */ for(i = 1; i< mb->stop; i++){ sig= getInstrPtr(mb,i); if( getFunctionId(sig) == mvcRef){ @@ -502,14 +477,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M goto finish; } - if(!alias) { - ttlen = strlen(getFunctionId(sig)) + 1; //plus the null character - alias = GDKmalloc(ttlen); - if( alias == NULL) { - msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); - goto finish; - } - snprintf(alias, ttlen, "%s", getFunctionId(sig)); + if(!alias || strcmp(alias, str_nil) == 0) { + alias = GDKstrdup(getFunctionId(sig)); + } else { + alias = GDKstrdup(alias); + } + if( alias == NULL) { + msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); + goto finish; } #ifdef DEBUG_CQUERY @@ -522,6 +497,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M if( pnew == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); + freeMalBlk(mb); goto unlock; } pnetLimit = INITIAL_MAXCQ; @@ -532,6 +508,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M if( pnew == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); + freeMalBlk(mb); goto unlock; } pnetLimit += INITIAL_MAXCQ; @@ -543,6 +520,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "The continuous %s %s is already registered.\n", err_message, alias); GDKfree(alias); + //freeMalBlk(mb); do not mess with caches!! goto unlock; } @@ -552,10 +530,12 @@ CQregister(Client cntxt, MalBlkPtr mb, M msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Cannot find %s %s.%s.\n", err_message, getModuleId(sig), getFunctionId(sig)); GDKfree(alias); + freeMalBlk(mb); goto unlock; } if((msg = CQanalysis(cntxt, s->def, pnettop)) != MAL_SUCCEED) { GDKfree(alias); + freeMalBlk(mb); goto unlock; } if(heartbeats != HEARTBEAT_NIL) { @@ -569,37 +549,31 @@ CQregister(Client cntxt, MalBlkPtr mb, M } } - other = copyMalBlk(mb); - if(other == NULL) { - msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); - GDKfree(alias); - goto unlock; - } - q = newStmt(other, sqlRef, transactionRef); + q = newStmt(mb, sqlRef, transactionRef); if(q == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); - freeMalBlk(other); + freeMalBlk(mb); goto unlock; } - setArgType(other,q, 0, TYPE_void); - moveInstruction(other, getPC(other,q),i); - q = newStmt(other, sqlRef, commitRef); + setArgType(mb,q, 0, TYPE_void); + moveInstruction(mb, getPC(mb,q),i); + q = newStmt(mb, sqlRef, commitRef); if(q == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); - freeMalBlk(other); + freeMalBlk(mb); goto unlock; } - setArgType(other,q, 0, TYPE_void); - moveInstruction(other, getPC(other,q),i+2); - chkProgram(cntxt->usermodule, other); + setArgType(mb,q, 0, TYPE_void); + moveInstruction(mb, getPC(mb,q),i+2); + chkProgram(cntxt->usermodule, mb); pnet[pnettop].mod = GDKstrdup(getModuleId(sig)); if(pnet[pnettop].mod == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); - freeMalBlk(other); + freeMalBlk(mb); goto unlock; } @@ -607,23 +581,23 @@ CQregister(Client cntxt, MalBlkPtr mb, M if(pnet[pnettop].fcn == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); - freeMalBlk(other); + freeMalBlk(mb); GDKfree(pnet[pnettop].mod); goto unlock; } - pnet[pnettop].stk = prepareMALstack(other, other->vsize); + pnet[pnettop].stk = prepareMALstack(mb, mb->vsize); if(pnet[pnettop].stk == NULL) { msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); GDKfree(alias); - freeMalBlk(other); + freeMalBlk(mb); GDKfree(pnet[pnettop].mod); GDKfree(pnet[pnettop].fcn); goto unlock; } pnet[pnettop].alias = alias; - pnet[pnettop].mb = other; + pnet[pnettop].mb = mb; pnet[pnettop].cycles = cycles; pnet[pnettop].beats = SET_HEARTBEATS(heartbeats); //subtract the beats value so the CQ will start at the precise moment diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h --- a/sql/backends/monet5/sql_cquery.h +++ b/sql/backends/monet5/sql_cquery.h @@ -69,8 +69,6 @@ sql5_export int pnetLimit, pnettop; sql5_export int CQlocateQueryExternal(str modname, str fcnname); sql5_export int CQlocateBasketExternal(str schname, str tblname); -sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); - sql5_export str CQwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list