Changeset: b6f92f25fc4d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b6f92f25fc4d
Modified Files:
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        sql/backends/monet5/cquery.mal
        sql/backends/monet5/sql_cat.c
        sql/backends/monet5/sql_cat.h
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sql_execute.c
        sql/backends/monet5/sql_statement.c
        sql/backends/monet5/sqlcatalog.mal
        sql/include/sql_relation.h
        sql/server/rel_psm.c
        sql/server/rel_schema.c
        sql/server/rel_semantic.c
        sql/server/sql_mvc.h
        sql/server/sql_parser.h
        sql/server/sql_parser.y
Branch: trails
Log Message:

Added stop, resume and pause all continuous queries. Resuming a single 
continuous query with either setting heartbeats, cycles or not is also possible


diffs (truncated from 513 to 300 lines):

diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -81,6 +81,7 @@ str subcountRef;
 str copyRef;
 str copy_fromRef;
 str export_tableRef;
+str change_cpRef;
 str count_no_nilRef;
 str crossRef;
 str createRef;
@@ -408,6 +409,7 @@ void optimizerInit(void)
        drop_roleRef = putName("drop_role");
        drop_userRef = putName("drop_user");
        drop_indexRef = putName("drop_index");
+       change_cpRef = putName("change_cp");
        drop_functionRef = putName("drop_function");
        drop_triggerRef = putName("drop_trigger");
        mergecandRef= putName("mergecand");
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -110,6 +110,7 @@ mal_export  str drop_userRef;
 mal_export  str drop_roleRef;
 mal_export  str drop_userRef;
 mal_export  str drop_indexRef;
+mal_export  str change_cpRef;
 mal_export  str drop_functionRef;
 mal_export  str drop_triggerRef;
 mal_export  str subdiffRef;
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -34,7 +34,10 @@ the MAL block to determine the input/out
 
 pattern resume(mod:str, fcn:str)
 address CQresume
-comment "Activate a specific continuous query";
+comment "Activate a specific continuous query with no changes";
+pattern resumenoalter(mod:str, fcn:str)
+address CQresumeNoAlter
+comment "Activate a specific continuous query with changes to the cycles and 
heartbeat values";
 pattern resume()
 address CQresumeAll
 comment "Activate all continuous queries";
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -24,6 +24,7 @@
 #include "querylog.h"
 #include "mal_builder.h"
 #include "mal_debugger.h"
+#include "sql_cquery.h"
 
 #include <rel_select.h>
 #include <rel_optimizer.h>
@@ -380,6 +381,21 @@ drop_index(Client cntxt, mvc *sql, char 
 }
 
 static str
+change_cp(Client cntxt, int action)
+{
+       switch(action) {
+               case mod_stop_all_continuous:
+                       return CQderegisterAll(cntxt, NULL, 0, 0);
+               case mod_pause_all_continuous:
+                       return CQpauseAll(cntxt, NULL, 0, 0);
+               case mod_resume_all_continuous:
+                       return CQresumeAll(cntxt, NULL, 0, 0);
+               default:
+                       return sql_message("42000!ALL CONTINUOUS: Unknown 
option");
+       }
+}
+
+static str
 create_seq(mvc *sql, char *sname, char *seqname, sql_sequence *seq)
 {
        sql_schema *s = NULL;
@@ -1188,6 +1204,17 @@ SQLdrop_index(Client cntxt, MalBlkPtr mb
 }
 
 str
+SQLchange_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      mvc *sql = NULL;
+       str msg;
+       int action = *getArgReference_int(stk, pci, 1);
+
+       initcontext();
+       msg = change_cp(cntxt, action);
+       return msg;
+}
+
+str
 SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) 
 {      mvc *sql = NULL;
        str msg;
diff --git a/sql/backends/monet5/sql_cat.h b/sql/backends/monet5/sql_cat.h
--- a/sql/backends/monet5/sql_cat.h
+++ b/sql/backends/monet5/sql_cat.h
@@ -56,6 +56,7 @@ sql5_export str SQLrename_user(Client cn
 sql5_export str SQLcreate_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
 sql5_export str SQLdrop_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
 sql5_export str SQLdrop_index(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
+sql5_export str SQLchange_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
 sql5_export str SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
 sql5_export str SQLcreate_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
 sql5_export str SQLcreate_trigger(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci) ;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -488,7 +488,6 @@ CQprocedure(Client cntxt, MalBlkPtr mb, 
        int i;
        char name[IDLENGTH];
 
-
        /* check existing of the pre-compiled and activated function */
        sch = *getArgReference_str(stk, pci, 1);
        nme = *getArgReference_str(stk, pci, 2);
@@ -565,7 +564,6 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        int i, cycles = sqlcontext ? sqlcontext->cycles : int_nil, heartbeats = 
sqlcontext ? sqlcontext->heartbeats : 1;
 
        (void) pci;
-       (void) stk;
 
        if(cycles <= 0 && cycles != int_nil){
                msg = createException(SQL,"cquery.register","The cycles value 
must be positive");
@@ -633,6 +631,55 @@ finish:
 }
 
 static str
+CQresumeInternal(Client cntxt, str modnme, str fcnnme, int with_alter)
+{
+       mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
+       str msg = MAL_SUCCEED;
+       int idx, cycles, heartbeats;
+
+       MT_lock_set(&ttrLock);
+
+       if(with_alter) {
+               cycles = sqlcontext ? sqlcontext->cycles : int_nil;
+               heartbeats = sqlcontext ? sqlcontext->heartbeats : 1;
+               if(cycles <= 0 && cycles != int_nil){
+                       msg = createException(SQL,"cquery.resume","The cycles 
value must be positive");
+                       goto finish;
+               }
+               if(heartbeats <= 0){
+                       msg = createException(SQL,"cquery.resume","The 
heartbeats value must be positive");
+                       goto finish;
+               }
+       }
+
+       idx = CQlocate(modnme, fcnnme);
+       if( idx == pnettop) {
+               msg = createException(SQL, "cquery.resume", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
+               goto finish;
+       }
+       if( pnet[idx].status != CQPAUSE)
+               goto finish;
+
+#ifdef DEBUG_CQUERY
+       fprintf(stderr, "#resume scheduler\n");
+#endif
+       pnet[idx].status = CQWAIT;
+       if(with_alter) {
+               pnet[idx].cycles = cycles;
+               pnet[idx].beats = heartbeats * 1000;
+       }
+
+       /* start the scheduler if needed */
+       if(CQinit == 0) {
+               msg = CQstartScheduler();
+       }
+
+finish:
+       MT_lock_unset(&ttrLock);
+       return msg;
+}
+
+static str
 CQresumeInternalRanges(int first, int last)
 {
        str msg = MAL_SUCCEED;
@@ -650,26 +697,6 @@ CQresumeInternalRanges(int first, int la
        return msg;
 }
 
-static str
-CQresumeInternal(str modnme, str fcnnme)
-{
-       int idx;
-       str msg = MAL_SUCCEED;
-
-       MT_lock_set(&ttrLock);
-       idx = CQlocate(modnme, fcnnme);
-       if( idx == pnettop) {
-               msg = createException(SQL, "cquery.resume", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
-               goto finish;
-       }
-       if( pnet[idx].status != CQPAUSE)
-               goto finish;
-       msg = CQresumeInternalRanges(idx, idx+1);
-finish:
-       MT_lock_unset(&ttrLock);
-       return msg;
-}
-
 str
 CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -689,7 +716,6 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
 {
        int i, k =-1;
        InstrPtr q;
-       (void) cntxt;
        (void) stk;
        (void) pci;
 
@@ -704,7 +730,30 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
                }
        }
        if( k >= 0 )
-               return CQresumeInternal(getModuleId(getInstrPtr(mb,k)), 
getFunctionId(getInstrPtr(mb,k)));
+               return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), 
getFunctionId(getInstrPtr(mb,k)), 1);
+       throw(SQL,"cquery.resume","Continuous query not found ");
+}
+
+str
+CQresumeNoAlter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int i, k =-1;
+       InstrPtr q;
+       (void) stk;
+       (void) pci;
+
+       for( i=1; i < mb->stop; i++){
+               q = getInstrPtr(mb,i);
+
+               if( q->token == ENDsymbol )
+                       break;
+               if( getModuleId(q) == userRef){
+                       k = i;
+                       break;
+               }
+       }
+       if( k >= 0 )
+               return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), 
getFunctionId(getInstrPtr(mb,k)), 0);
        throw(SQL,"cquery.resume","Continuous query not found ");
 }
 
@@ -1026,7 +1075,7 @@ CQexecute( Client cntxt, int idx)
 
        // release all locks held
 #ifdef DEBUG_CQUERY
-               fprintf(stderr, "#cquery.execute %s.%s finised %s\n", 
node->mod, node->fcn, (msg?msg:""));
+               fprintf(stderr, "#cquery.execute %s.%s finished %s\n", 
node->mod, node->fcn, (msg?msg:""));
 #endif
        MT_lock_set(&ttrLock);
        if( node->status != CQSTOP)
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -25,10 +25,6 @@
 //#define DEBUG_CQUERY
 //#define DEBUG_CQUERY_SCHEDULER
 
-#define REGISTER_CQUERY           1
-#define REGISTER_AND_START_CQUERY 2
-#define RESTART_CQUERY            3
-
 #define CQINIT     0
 #define CQREGISTER 1    /* being registered */
 #define CQWAIT            2    /* wait for data */
@@ -73,6 +69,7 @@ sql5_export MT_Lock ttrLock;
 sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 sql5_export str CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 sql5_export str CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+sql5_export str CQresumeNoAlter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 sql5_export str CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 sql5_export str CQpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 sql5_export str CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/backends/monet5/sql_execute.c 
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -365,9 +365,13 @@ SQLrun(Client c, backend *be, mvc *m){
                                msg = CQpause(c,mb, 0,0);
                                break;
                        case mod_resume_continuous:
-                               //mnstr_printf(c->fdout, "#Resume continuous 
query\n");
+                               //mnstr_printf(c->fdout, "#Resume continuous 
query with alter\n");
                                msg = CQresume(c,mb, 0,0);
                                break;
+                       case mod_resume_continuous_no_alter:
+                               //mnstr_printf(c->fdout, "#Resume continuous 
query with no alter\n");
+                               msg = CQresumeNoAlter(c,mb, 0,0);
+                               break;
                        default:
                                msg = runMAL(c, mb, 0, 0);
                        }
diff --git a/sql/backends/monet5/sql_statement.c 
b/sql/backends/monet5/sql_statement.c
--- a/sql/backends/monet5/sql_statement.c
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to