Changeset: 101c7a0ab76a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=101c7a0ab76a
Added Files:
        sql/backends/monet5/sql_timestamps.c
        sql/backends/monet5/sql_timestamps.h
Modified Files:
        sql/backends/monet5/Makefile.ag
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sql_execute.c
        sql/backends/monet5/sql_scenario.c
        sql/include/sql_catalog.h
        sql/server/rel_psm.c
        sql/server/sql_mvc.h
        sql/server/sql_parser.y
        sql/server/sql_qc.c
        sql/server/sql_qc.h
        sql/server/sql_scan.c
Branch: trails
Log Message:

More advances in the delay of the beginning of the continuous query


diffs (truncated from 659 to 300 lines):

diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag
--- a/sql/backends/monet5/Makefile.ag
+++ b/sql/backends/monet5/Makefile.ag
@@ -47,6 +47,7 @@ lib__sql = {
                sql_orderidx.c sql_orderidx.h \
                sql_basket.c sql_basket.h \
                sql_cquery.c sql_cquery.h \
+               sql_timestamps.c sql_timestamps.h \
                wlr.c wlr.h \
                sql_rank.c sql_rank.h
        LIBS = ../../server/libsqlserver \
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -45,6 +45,7 @@
 #include "monetdb_config.h"
 #include "sql_optimizer.h"
 #include "sql_gencode.h"
+#include "sql_timestamps.h"
 #include "sql_cquery.h"
 #include "sql_basket.h"
 #include "mal_builder.h"
@@ -68,7 +69,7 @@ static BAT *CQ_id_stmt = 0;
 CQnode *pnet = 0;
 int pnetLimit = 0, pnettop = 0;
 
-#define SET_HEARTBEATS(X) (X != NO_HEARTBEAT) ? X * 1000 : NO_HEARTBEAT /* 
minimal 1 ms */
+#define SET_HEARTBEATS(X) (X != HEARTBEAT_NIL) ? X * 1000 : HEARTBEAT_NIL /* 
minimal 1 ms */
 
 #define ALL_ROOT_CHECK(cntxt, malcal, name)                                    
                                        \
        do {                                                                    
                                           \
@@ -476,9 +477,10 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        CQnode *pnew;
        backend *be = (backend *) cntxt->sqlcontext;
        mvc* sqlcontext;
+       AtomNode* start_atom = NULL;
        char* err_message = "procedure";
        int i, j, is_function = 0, cycles = DEFAULT_CP_CYCLES;
-       lng heartbeats = DEFAULT_CP_HEARTBEAT;
+       lng heartbeats = DEFAULT_CP_HEARTBEAT, start_at_parsed = 0;
 
        (void) pci;
 
@@ -487,18 +489,26 @@ CQregister(Client cntxt, MalBlkPtr mb, M
                if(sqlcontext->continuous & mod_continuous_function)
                        err_message = "function";
                cycles = sqlcontext->cycles;
+               start_atom = (AtomNode*) sqlcontext->startat_atom;
                heartbeats = sqlcontext->heartbeats;
                is_function = (sqlcontext->continuous & 
mod_continuous_function);
        }
 
-       if(cycles < 0 && cycles != NO_CYCLES){
+       if(cycles < 0 && cycles != CYCLES_NIL){
                msg = createException(SQL,"cquery.register",SQLSTATE(42000) 
"The cycles value must be non negative\n");
                goto finish;
        }
-       if(heartbeats < 0 && heartbeats != NO_HEARTBEAT){
+       if(heartbeats < 0 && heartbeats != HEARTBEAT_NIL){
                msg = createException(SQL,"cquery.register",SQLSTATE(42000) 
"The heartbeats value must be non negative\n");
                goto finish;
        }
+       if(start_atom && (msg = convert_atom_into_unix_timestamp(start_atom, 
&start_at_parsed)) != MAL_SUCCEED){
+               goto finish;
+       }
+       /**
+        * We are using GDKusec() to check if the query is enable to fire, so 
we have to convert into microseconds
+        */
+       start_at_parsed *= 1000;
 
        if(is_function){ /* for functions we need to remove the sql.mvc 
instruction */
                for(i = 1; i< mb->stop; i++){
@@ -555,7 +565,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        if((msg = CQanalysis(cntxt, s->def, pnettop)) != MAL_SUCCEED) {
                goto unlock;
        }
-       if(heartbeats != NO_HEARTBEAT) {
+       if(heartbeats != HEARTBEAT_NIL) {
                for(j=0; j < MAXSTREAMS && pnet[pnettop].baskets[j]; j++) {
                        if(baskets[pnet[pnettop].baskets[j]].window != 
DEFAULT_TABLE_WINDOW) {
                                msg = createException(SQL, "cquery.register",
@@ -625,8 +635,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        }
        pnet[pnettop].cycles = cycles;
        pnet[pnettop].beats = SET_HEARTBEATS(heartbeats);
-       pnet[pnettop].startat = 0;
-       pnet[pnettop].run = 0;
+       pnet[pnettop].run = start_at_parsed;
        pnet[pnettop].seen = *timestamp_nil;
        pnet[pnettop].status = CQWAIT;
        pnettop++;
@@ -645,7 +654,8 @@ CQresumeInternal(Client cntxt, MalBlkPtr
 {
        str msg = MAL_SUCCEED, mb2str = NULL;
        int idx = 0, j, cycles = DEFAULT_CP_CYCLES;
-       lng heartbeats = DEFAULT_CP_HEARTBEAT;
+       lng heartbeats = DEFAULT_CP_HEARTBEAT, start_at_parsed = 0;
+       AtomNode* start_atom = NULL;
        mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
        char* err_message = (sqlcontext && sqlcontext->continuous & 
mod_continuous_function) ? "function" : "procedure";
 
@@ -655,15 +665,20 @@ CQresumeInternal(Client cntxt, MalBlkPtr
 
        if(with_alter && sqlcontext) {
                cycles = sqlcontext->cycles;
+               start_atom = (AtomNode*) sqlcontext->startat_atom;
                heartbeats = sqlcontext->heartbeats;
-               if(cycles < 0 && cycles != NO_CYCLES){
+               if(cycles < 0 && cycles != CYCLES_NIL){
                        msg = 
createException(SQL,"cquery.resume",SQLSTATE(42000) "The cycles value must be 
non negative\n");
                        goto finish;
                }
-               if(heartbeats < 0 && heartbeats != NO_HEARTBEAT){
+               if(heartbeats < 0 && heartbeats != HEARTBEAT_NIL){
                        msg = 
createException(SQL,"cquery.resume",SQLSTATE(42000) "The heartbeats value must 
be non negative\n");
                        goto finish;
                }
+               if(start_atom && (msg = 
convert_atom_into_unix_timestamp(start_atom, &start_at_parsed)) != MAL_SUCCEED){
+                       goto finish;
+               }
+               start_at_parsed *= 1000;
        }
 
        MT_lock_set(&ttrLock);
@@ -681,7 +696,7 @@ CQresumeInternal(Client cntxt, MalBlkPtr
                                                          SQLSTATE(42000) "The 
continuous %s %s is already running\n", err_message, mb2str);
                goto unlock;
        }
-       if(with_alter && heartbeats != NO_HEARTBEAT) {
+       if(with_alter && heartbeats != HEARTBEAT_NIL) {
                for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) {
                        if(baskets[pnet[idx].baskets[j]].window != 
DEFAULT_TABLE_WINDOW) {
                                msg = createException(SQL, "cquery.resume",
@@ -694,6 +709,8 @@ CQresumeInternal(Client cntxt, MalBlkPtr
        pnet[idx].status = CQWAIT;
        if(with_alter) {
                pnet[idx].cycles = cycles;
+               if(start_at_parsed > 0)
+                       pnet[idx].run = start_at_parsed;
                pnet[idx].beats = SET_HEARTBEATS(heartbeats);
        }
 
@@ -915,7 +932,7 @@ CQcycles(Client cntxt, MalBlkPtr mb, Mal
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#set cycles \n");
 #endif
-       if(cycles < 0 && cycles != NO_CYCLES){
+       if(cycles < 0 && cycles != CYCLES_NIL){
                msg = createException(SQL,"cquery.cycles",SQLSTATE(42000) "The 
cycles value must be non negative\n");
                goto finish;
        }
@@ -968,7 +985,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, 
                                there_is_window_constraint = 1;
                        }
                }
-               if(heartbeats != NO_HEARTBEAT && there_is_window_constraint) {
+               if(heartbeats != HEARTBEAT_NIL && there_is_window_constraint) {
                        msg = createException(SQL, "cquery.heartbeat",
                                                                  
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
                        goto finish;
@@ -1102,7 +1119,7 @@ CQdump(void *ret)
        for (i = 0; i < pnettop; i++) {
                fprintf(stderr, "#[%d]\t%s.%s %s ", i, pnet[i].mod, 
pnet[i].fcn, statusname[pnet[i].status]);
                fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
-               fprintf(stderr, "startat="LLFMT" ", pnet[i].startat);
+               fprintf(stderr, "run="LLFMT" ", pnet[i].run);
                fprintf(stderr, "cycles=%d ", pnet[i].cycles);
                if( pnet[i].inout[0])
                        fprintf(stderr, " streams ");
@@ -1182,7 +1199,7 @@ CQscheduler(void *dummy)
        MT_lock_unset(&ttrLock);
 
        while( pnstatus != CQSTOP  && ! GDKexiting()){
-               /* Determine which continuous queries are eligble to run.
+               /* Determine which continuous queries are eligible to run.
                   Collect latest statistics, note that we don't need a lock 
here,
                   because the count need not be accurate to the usec. It will 
simply
                   come back. We also only have to check the places that are 
marked
@@ -1193,19 +1210,19 @@ CQscheduler(void *dummy)
                MT_lock_set(&ttrLock); // analysis should be done with 
exclusive access
                for (k = i = 0; i < pnettop; i++)
                if ( pnet[i].status == CQWAIT ){
-                       pnet[i].enabled = pnet[i].error == 0 && (pnet[i].cycles 
> 0 || pnet[i].cycles == NO_CYCLES);
+                       pnet[i].enabled = pnet[i].error == 0 && (pnet[i].cycles 
> 0 || pnet[i].cycles == CYCLES_NIL);
 
                        /* Queries are triggered by the heartbeat or  all 
window constraints */
                        /* A heartbeat in combination with a window constraint 
is ambiguous */
                        /* At least one constraint should be set */
-                       if( pnet[i].beats == NO_HEARTBEAT && pnet[i].baskets[0] 
== 0)
+                       if( pnet[i].beats == HEARTBEAT_NIL && 
pnet[i].baskets[0] == 0)
                                pnet[i].enabled = 0;
 
-                       if( pnet[i].enabled && (pnet[i].beats > 0 || 
pnet[i].startat != NO_STARTAT)){
-                               pnet[i].enabled = now >= pnet[i].run + 
pnet[i].beats + pnet[i].startat;
+                       if( pnet[i].enabled && (pnet[i].beats > 0 || 
pnet[i].run > 0)) {
+                               pnet[i].enabled = now >= pnet[i].run + 
(pnet[i].beats > 0 ? pnet[i].beats : 0);
 #ifdef DEBUG_CQUERY_SCHEDULER
                                fprintf(stderr,"#beat %s.%s  "LLFMT"("LLFMT") 
%s\n", pnet[i].mod, pnet[i].fcn,
-                                       pnet[i].run + pnet[i].beats + 
pnet[i].startat, now, (pnet[i].enabled? "enabled":"disabled"));
+                                       pnet[i].run + (pnet[i].beats > 0 ? 
pnet[i].beats : 0), now, (pnet[i].enabled? "enabled":"disabled"));
 #endif
                        }
 
@@ -1276,7 +1293,7 @@ CQscheduler(void *dummy)
                                        msg= 
createException(MAL,"petrinet.scheduler","Can not fork the thread");
                                } else
 */
-                       if( pnet[i].cycles != NO_CYCLES && pnet[i].cycles > 0)
+                       if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0)
                                pnet[i].cycles--;
                        pnet[i].run = now;                              /* last 
executed */
                        pnet[i].time = GDKusec() - t;   /* keep around in 
microseconds */
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -54,10 +54,9 @@ typedef struct {
 
        int cycles;             /* limit the number of invocations before dying 
*/
        lng beats;              /* heart beat stride for procedures activations 
-> must be in microseconds */
-       lng startat;    /* start at the CQ at that precise moment (UNIX 
timestamp) -> must be in microseconds */
+       lng run;                /* start at the CQ at that precise moment (UNIX 
timestamp) -> must be in microseconds */
 
        MT_Id   tid;    /* Thread responsible */
-       lng             run;    /* last executed relative to start of server */
        timestamp seen;
        str error;
        lng time;
diff --git a/sql/backends/monet5/sql_execute.c 
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -333,6 +333,7 @@ SQLrun(Client c, backend *be, mvc *m)
                        //set the cq parameters
                        m->continuous = be->q->continuous;
                        m->heartbeats = be->q->heartbeats;
+                       m->startat_atom = be->q->startat_atom;
                        m->cycles = be->q->cycles;
                        break;
                }
@@ -381,6 +382,7 @@ SQLrun(Client c, backend *be, mvc *m)
        }
        m->continuous = 0;
        m->heartbeats = DEFAULT_CP_HEARTBEAT;
+       m->startat_atom = NULL;
        m->cycles = DEFAULT_CP_CYCLES;
 
        // release the resources
diff --git a/sql/backends/monet5/sql_scenario.c 
b/sql/backends/monet5/sql_scenario.c
--- a/sql/backends/monet5/sql_scenario.c
+++ b/sql/backends/monet5/sql_scenario.c
@@ -1170,6 +1170,7 @@ SQLparser(Client c)
                                                                  escaped_q,
                                                                  m->continuous,
                                                                  m->heartbeats,
+                                                                 (AtomNode*) 
m->startat_atom,
                                                                  m->cycles);
                        }
                        GDKfree(q);
diff --git a/sql/backends/monet5/sql_timestamps.c 
b/sql/backends/monet5/sql_timestamps.c
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/sql_timestamps.c
@@ -0,0 +1,112 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
+ */
+
+/*
+ * author: Pedro Ferreira, M. Kersten
+ * This module converts a MonetDB atom into a UNIX timestamp if the 
conversation is possible. An exception is thrown
+ * otherwise.
+ */
+
+#include "sql_timestamps.h"
+#include "sql.h"
+#include "mtime.h"
+
+static int GetSQLTypeFromAtom(sql_subtype *sql_subtype)
+{
+       if (!sql_subtype)
+               return -1;
+       if (!sql_subtype->type)
+               return -1;
+       return sql_subtype->type->eclass;
+}
+
+str
+convert_atom_into_unix_timestamp(AtomNode *an, lng* res)
+{
+       atom *a = an->a;
+       str msg = MAL_SUCCEED;
+       *res = 0;
+
+       if(a->isnull) {
+               throw(SQL,"sql.timestamp",SQLSTATE(42000) "The start at value 
cannot be null\n");
+       }
+       switch (GetSQLTypeFromAtom(&a->tpe)) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to