Changeset: b6f92f25fc4d for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b6f92f25fc4d Modified Files: monetdb5/optimizer/opt_prelude.c monetdb5/optimizer/opt_prelude.h sql/backends/monet5/cquery.mal sql/backends/monet5/sql_cat.c sql/backends/monet5/sql_cat.h sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_cquery.h sql/backends/monet5/sql_execute.c sql/backends/monet5/sql_statement.c sql/backends/monet5/sqlcatalog.mal sql/include/sql_relation.h sql/server/rel_psm.c sql/server/rel_schema.c sql/server/rel_semantic.c sql/server/sql_mvc.h sql/server/sql_parser.h sql/server/sql_parser.y Branch: trails Log Message:
Added stop, resume and pause all continuous queries. Resuming a single continuous query with either setting heartbeats, cycles or not is also possible diffs (truncated from 513 to 300 lines): diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c --- a/monetdb5/optimizer/opt_prelude.c +++ b/monetdb5/optimizer/opt_prelude.c @@ -81,6 +81,7 @@ str subcountRef; str copyRef; str copy_fromRef; str export_tableRef; +str change_cpRef; str count_no_nilRef; str crossRef; str createRef; @@ -408,6 +409,7 @@ void optimizerInit(void) drop_roleRef = putName("drop_role"); drop_userRef = putName("drop_user"); drop_indexRef = putName("drop_index"); + change_cpRef = putName("change_cp"); drop_functionRef = putName("drop_function"); drop_triggerRef = putName("drop_trigger"); mergecandRef= putName("mergecand"); diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h --- a/monetdb5/optimizer/opt_prelude.h +++ b/monetdb5/optimizer/opt_prelude.h @@ -110,6 +110,7 @@ mal_export str drop_userRef; mal_export str drop_roleRef; mal_export str drop_userRef; mal_export str drop_indexRef; +mal_export str change_cpRef; mal_export str drop_functionRef; mal_export str drop_triggerRef; mal_export str subdiffRef; diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal --- a/sql/backends/monet5/cquery.mal +++ b/sql/backends/monet5/cquery.mal @@ -34,7 +34,10 @@ the MAL block to determine the input/out pattern resume(mod:str, fcn:str) address CQresume -comment "Activate a specific continuous query"; +comment "Activate a specific continuous query with no changes"; +pattern resumenoalter(mod:str, fcn:str) +address CQresumeNoAlter +comment "Activate a specific continuous query with changes to the cycles and heartbeat values"; pattern resume() address CQresumeAll comment "Activate all continuous queries"; diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c --- a/sql/backends/monet5/sql_cat.c +++ b/sql/backends/monet5/sql_cat.c @@ -24,6 +24,7 @@ #include "querylog.h" #include "mal_builder.h" #include "mal_debugger.h" +#include "sql_cquery.h" #include <rel_select.h> #include <rel_optimizer.h> @@ -380,6 +381,21 @@ drop_index(Client cntxt, mvc *sql, char } static str +change_cp(Client cntxt, int action) +{ + switch(action) { + case mod_stop_all_continuous: + return CQderegisterAll(cntxt, NULL, 0, 0); + case mod_pause_all_continuous: + return CQpauseAll(cntxt, NULL, 0, 0); + case mod_resume_all_continuous: + return CQresumeAll(cntxt, NULL, 0, 0); + default: + return sql_message("42000!ALL CONTINUOUS: Unknown option"); + } +} + +static str create_seq(mvc *sql, char *sname, char *seqname, sql_sequence *seq) { sql_schema *s = NULL; @@ -1188,6 +1204,17 @@ SQLdrop_index(Client cntxt, MalBlkPtr mb } str +SQLchange_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ mvc *sql = NULL; + str msg; + int action = *getArgReference_int(stk, pci, 1); + + initcontext(); + msg = change_cp(cntxt, action); + return msg; +} + +str SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { mvc *sql = NULL; str msg; diff --git a/sql/backends/monet5/sql_cat.h b/sql/backends/monet5/sql_cat.h --- a/sql/backends/monet5/sql_cat.h +++ b/sql/backends/monet5/sql_cat.h @@ -56,6 +56,7 @@ sql5_export str SQLrename_user(Client cn sql5_export str SQLcreate_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLdrop_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLdrop_index(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; +sql5_export str SQLchange_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLcreate_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; sql5_export str SQLcreate_trigger(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) ; diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -488,7 +488,6 @@ CQprocedure(Client cntxt, MalBlkPtr mb, int i; char name[IDLENGTH]; - /* check existing of the pre-compiled and activated function */ sch = *getArgReference_str(stk, pci, 1); nme = *getArgReference_str(stk, pci, 2); @@ -565,7 +564,6 @@ CQregister(Client cntxt, MalBlkPtr mb, M int i, cycles = sqlcontext ? sqlcontext->cycles : int_nil, heartbeats = sqlcontext ? sqlcontext->heartbeats : 1; (void) pci; - (void) stk; if(cycles <= 0 && cycles != int_nil){ msg = createException(SQL,"cquery.register","The cycles value must be positive"); @@ -633,6 +631,55 @@ finish: } static str +CQresumeInternal(Client cntxt, str modnme, str fcnnme, int with_alter) +{ + mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc; + str msg = MAL_SUCCEED; + int idx, cycles, heartbeats; + + MT_lock_set(&ttrLock); + + if(with_alter) { + cycles = sqlcontext ? sqlcontext->cycles : int_nil; + heartbeats = sqlcontext ? sqlcontext->heartbeats : 1; + if(cycles <= 0 && cycles != int_nil){ + msg = createException(SQL,"cquery.resume","The cycles value must be positive"); + goto finish; + } + if(heartbeats <= 0){ + msg = createException(SQL,"cquery.resume","The heartbeats value must be positive"); + goto finish; + } + } + + idx = CQlocate(modnme, fcnnme); + if( idx == pnettop) { + msg = createException(SQL, "cquery.resume", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); + goto finish; + } + if( pnet[idx].status != CQPAUSE) + goto finish; + +#ifdef DEBUG_CQUERY + fprintf(stderr, "#resume scheduler\n"); +#endif + pnet[idx].status = CQWAIT; + if(with_alter) { + pnet[idx].cycles = cycles; + pnet[idx].beats = heartbeats * 1000; + } + + /* start the scheduler if needed */ + if(CQinit == 0) { + msg = CQstartScheduler(); + } + +finish: + MT_lock_unset(&ttrLock); + return msg; +} + +static str CQresumeInternalRanges(int first, int last) { str msg = MAL_SUCCEED; @@ -650,26 +697,6 @@ CQresumeInternalRanges(int first, int la return msg; } -static str -CQresumeInternal(str modnme, str fcnnme) -{ - int idx; - str msg = MAL_SUCCEED; - - MT_lock_set(&ttrLock); - idx = CQlocate(modnme, fcnnme); - if( idx == pnettop) { - msg = createException(SQL, "cquery.resume", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); - goto finish; - } - if( pnet[idx].status != CQPAUSE) - goto finish; - msg = CQresumeInternalRanges(idx, idx+1); -finish: - MT_lock_unset(&ttrLock); - return msg; -} - str CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -689,7 +716,6 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal { int i, k =-1; InstrPtr q; - (void) cntxt; (void) stk; (void) pci; @@ -704,7 +730,30 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal } } if( k >= 0 ) - return CQresumeInternal(getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); + return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)), 1); + throw(SQL,"cquery.resume","Continuous query not found "); +} + +str +CQresumeNoAlter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int i, k =-1; + InstrPtr q; + (void) stk; + (void) pci; + + for( i=1; i < mb->stop; i++){ + q = getInstrPtr(mb,i); + + if( q->token == ENDsymbol ) + break; + if( getModuleId(q) == userRef){ + k = i; + break; + } + } + if( k >= 0 ) + return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)), 0); throw(SQL,"cquery.resume","Continuous query not found "); } @@ -1026,7 +1075,7 @@ CQexecute( Client cntxt, int idx) // release all locks held #ifdef DEBUG_CQUERY - fprintf(stderr, "#cquery.execute %s.%s finised %s\n", node->mod, node->fcn, (msg?msg:"")); + fprintf(stderr, "#cquery.execute %s.%s finished %s\n", node->mod, node->fcn, (msg?msg:"")); #endif MT_lock_set(&ttrLock); if( node->status != CQSTOP) diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h --- a/sql/backends/monet5/sql_cquery.h +++ b/sql/backends/monet5/sql_cquery.h @@ -25,10 +25,6 @@ //#define DEBUG_CQUERY //#define DEBUG_CQUERY_SCHEDULER -#define REGISTER_CQUERY 1 -#define REGISTER_AND_START_CQUERY 2 -#define RESTART_CQUERY 3 - #define CQINIT 0 #define CQREGISTER 1 /* being registered */ #define CQWAIT 2 /* wait for data */ @@ -73,6 +69,7 @@ sql5_export MT_Lock ttrLock; sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +sql5_export str CQresumeNoAlter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str CQpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); sql5_export str CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); diff --git a/sql/backends/monet5/sql_execute.c b/sql/backends/monet5/sql_execute.c --- a/sql/backends/monet5/sql_execute.c +++ b/sql/backends/monet5/sql_execute.c @@ -365,9 +365,13 @@ SQLrun(Client c, backend *be, mvc *m){ msg = CQpause(c,mb, 0,0); break; case mod_resume_continuous: - //mnstr_printf(c->fdout, "#Resume continuous query\n"); + //mnstr_printf(c->fdout, "#Resume continuous query with alter\n"); msg = CQresume(c,mb, 0,0); break; + case mod_resume_continuous_no_alter: + //mnstr_printf(c->fdout, "#Resume continuous query with no alter\n"); + msg = CQresumeNoAlter(c,mb, 0,0); + break; default: msg = runMAL(c, mb, 0, 0); } diff --git a/sql/backends/monet5/sql_statement.c b/sql/backends/monet5/sql_statement.c --- a/sql/backends/monet5/sql_statement.c _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list