Changeset: 101c7a0ab76a for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=101c7a0ab76a Added Files: sql/backends/monet5/sql_timestamps.c sql/backends/monet5/sql_timestamps.h Modified Files: sql/backends/monet5/Makefile.ag sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_cquery.h sql/backends/monet5/sql_execute.c sql/backends/monet5/sql_scenario.c sql/include/sql_catalog.h sql/server/rel_psm.c sql/server/sql_mvc.h sql/server/sql_parser.y sql/server/sql_qc.c sql/server/sql_qc.h sql/server/sql_scan.c Branch: trails Log Message:
More advances in the delay of the beginning of the continuous query diffs (truncated from 659 to 300 lines): diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag --- a/sql/backends/monet5/Makefile.ag +++ b/sql/backends/monet5/Makefile.ag @@ -47,6 +47,7 @@ lib__sql = { sql_orderidx.c sql_orderidx.h \ sql_basket.c sql_basket.h \ sql_cquery.c sql_cquery.h \ + sql_timestamps.c sql_timestamps.h \ wlr.c wlr.h \ sql_rank.c sql_rank.h LIBS = ../../server/libsqlserver \ diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -45,6 +45,7 @@ #include "monetdb_config.h" #include "sql_optimizer.h" #include "sql_gencode.h" +#include "sql_timestamps.h" #include "sql_cquery.h" #include "sql_basket.h" #include "mal_builder.h" @@ -68,7 +69,7 @@ static BAT *CQ_id_stmt = 0; CQnode *pnet = 0; int pnetLimit = 0, pnettop = 0; -#define SET_HEARTBEATS(X) (X != NO_HEARTBEAT) ? X * 1000 : NO_HEARTBEAT /* minimal 1 ms */ +#define SET_HEARTBEATS(X) (X != HEARTBEAT_NIL) ? X * 1000 : HEARTBEAT_NIL /* minimal 1 ms */ #define ALL_ROOT_CHECK(cntxt, malcal, name) \ do { \ @@ -476,9 +477,10 @@ CQregister(Client cntxt, MalBlkPtr mb, M CQnode *pnew; backend *be = (backend *) cntxt->sqlcontext; mvc* sqlcontext; + AtomNode* start_atom = NULL; char* err_message = "procedure"; int i, j, is_function = 0, cycles = DEFAULT_CP_CYCLES; - lng heartbeats = DEFAULT_CP_HEARTBEAT; + lng heartbeats = DEFAULT_CP_HEARTBEAT, start_at_parsed = 0; (void) pci; @@ -487,18 +489,26 @@ CQregister(Client cntxt, MalBlkPtr mb, M if(sqlcontext->continuous & mod_continuous_function) err_message = "function"; cycles = sqlcontext->cycles; + start_atom = (AtomNode*) sqlcontext->startat_atom; heartbeats = sqlcontext->heartbeats; is_function = (sqlcontext->continuous & mod_continuous_function); } - if(cycles < 0 && cycles != NO_CYCLES){ + if(cycles < 0 && cycles != CYCLES_NIL){ msg = createException(SQL,"cquery.register",SQLSTATE(42000) "The cycles value must be non negative\n"); goto finish; } - if(heartbeats < 0 && heartbeats != NO_HEARTBEAT){ + if(heartbeats < 0 && heartbeats != HEARTBEAT_NIL){ msg = createException(SQL,"cquery.register",SQLSTATE(42000) "The heartbeats value must be non negative\n"); goto finish; } + if(start_atom && (msg = convert_atom_into_unix_timestamp(start_atom, &start_at_parsed)) != MAL_SUCCEED){ + goto finish; + } + /** + * We are using GDKusec() to check if the query is enable to fire, so we have to convert into microseconds + */ + start_at_parsed *= 1000; if(is_function){ /* for functions we need to remove the sql.mvc instruction */ for(i = 1; i< mb->stop; i++){ @@ -555,7 +565,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M if((msg = CQanalysis(cntxt, s->def, pnettop)) != MAL_SUCCEED) { goto unlock; } - if(heartbeats != NO_HEARTBEAT) { + if(heartbeats != HEARTBEAT_NIL) { for(j=0; j < MAXSTREAMS && pnet[pnettop].baskets[j]; j++) { if(baskets[pnet[pnettop].baskets[j]].window != DEFAULT_TABLE_WINDOW) { msg = createException(SQL, "cquery.register", @@ -625,8 +635,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M } pnet[pnettop].cycles = cycles; pnet[pnettop].beats = SET_HEARTBEATS(heartbeats); - pnet[pnettop].startat = 0; - pnet[pnettop].run = 0; + pnet[pnettop].run = start_at_parsed; pnet[pnettop].seen = *timestamp_nil; pnet[pnettop].status = CQWAIT; pnettop++; @@ -645,7 +654,8 @@ CQresumeInternal(Client cntxt, MalBlkPtr { str msg = MAL_SUCCEED, mb2str = NULL; int idx = 0, j, cycles = DEFAULT_CP_CYCLES; - lng heartbeats = DEFAULT_CP_HEARTBEAT; + lng heartbeats = DEFAULT_CP_HEARTBEAT, start_at_parsed = 0; + AtomNode* start_atom = NULL; mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc; char* err_message = (sqlcontext && sqlcontext->continuous & mod_continuous_function) ? "function" : "procedure"; @@ -655,15 +665,20 @@ CQresumeInternal(Client cntxt, MalBlkPtr if(with_alter && sqlcontext) { cycles = sqlcontext->cycles; + start_atom = (AtomNode*) sqlcontext->startat_atom; heartbeats = sqlcontext->heartbeats; - if(cycles < 0 && cycles != NO_CYCLES){ + if(cycles < 0 && cycles != CYCLES_NIL){ msg = createException(SQL,"cquery.resume",SQLSTATE(42000) "The cycles value must be non negative\n"); goto finish; } - if(heartbeats < 0 && heartbeats != NO_HEARTBEAT){ + if(heartbeats < 0 && heartbeats != HEARTBEAT_NIL){ msg = createException(SQL,"cquery.resume",SQLSTATE(42000) "The heartbeats value must be non negative\n"); goto finish; } + if(start_atom && (msg = convert_atom_into_unix_timestamp(start_atom, &start_at_parsed)) != MAL_SUCCEED){ + goto finish; + } + start_at_parsed *= 1000; } MT_lock_set(&ttrLock); @@ -681,7 +696,7 @@ CQresumeInternal(Client cntxt, MalBlkPtr SQLSTATE(42000) "The continuous %s %s is already running\n", err_message, mb2str); goto unlock; } - if(with_alter && heartbeats != NO_HEARTBEAT) { + if(with_alter && heartbeats != HEARTBEAT_NIL) { for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) { if(baskets[pnet[idx].baskets[j]].window != DEFAULT_TABLE_WINDOW) { msg = createException(SQL, "cquery.resume", @@ -694,6 +709,8 @@ CQresumeInternal(Client cntxt, MalBlkPtr pnet[idx].status = CQWAIT; if(with_alter) { pnet[idx].cycles = cycles; + if(start_at_parsed > 0) + pnet[idx].run = start_at_parsed; pnet[idx].beats = SET_HEARTBEATS(heartbeats); } @@ -915,7 +932,7 @@ CQcycles(Client cntxt, MalBlkPtr mb, Mal #ifdef DEBUG_CQUERY fprintf(stderr, "#set cycles \n"); #endif - if(cycles < 0 && cycles != NO_CYCLES){ + if(cycles < 0 && cycles != CYCLES_NIL){ msg = createException(SQL,"cquery.cycles",SQLSTATE(42000) "The cycles value must be non negative\n"); goto finish; } @@ -968,7 +985,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, there_is_window_constraint = 1; } } - if(heartbeats != NO_HEARTBEAT && there_is_window_constraint) { + if(heartbeats != HEARTBEAT_NIL && there_is_window_constraint) { msg = createException(SQL, "cquery.heartbeat", SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n"); goto finish; @@ -1102,7 +1119,7 @@ CQdump(void *ret) for (i = 0; i < pnettop; i++) { fprintf(stderr, "#[%d]\t%s.%s %s ", i, pnet[i].mod, pnet[i].fcn, statusname[pnet[i].status]); fprintf(stderr, "beats="LLFMT" ", pnet[i].beats); - fprintf(stderr, "startat="LLFMT" ", pnet[i].startat); + fprintf(stderr, "run="LLFMT" ", pnet[i].run); fprintf(stderr, "cycles=%d ", pnet[i].cycles); if( pnet[i].inout[0]) fprintf(stderr, " streams "); @@ -1182,7 +1199,7 @@ CQscheduler(void *dummy) MT_lock_unset(&ttrLock); while( pnstatus != CQSTOP && ! GDKexiting()){ - /* Determine which continuous queries are eligble to run. + /* Determine which continuous queries are eligible to run. Collect latest statistics, note that we don't need a lock here, because the count need not be accurate to the usec. It will simply come back. We also only have to check the places that are marked @@ -1193,19 +1210,19 @@ CQscheduler(void *dummy) MT_lock_set(&ttrLock); // analysis should be done with exclusive access for (k = i = 0; i < pnettop; i++) if ( pnet[i].status == CQWAIT ){ - pnet[i].enabled = pnet[i].error == 0 && (pnet[i].cycles > 0 || pnet[i].cycles == NO_CYCLES); + pnet[i].enabled = pnet[i].error == 0 && (pnet[i].cycles > 0 || pnet[i].cycles == CYCLES_NIL); /* Queries are triggered by the heartbeat or all window constraints */ /* A heartbeat in combination with a window constraint is ambiguous */ /* At least one constraint should be set */ - if( pnet[i].beats == NO_HEARTBEAT && pnet[i].baskets[0] == 0) + if( pnet[i].beats == HEARTBEAT_NIL && pnet[i].baskets[0] == 0) pnet[i].enabled = 0; - if( pnet[i].enabled && (pnet[i].beats > 0 || pnet[i].startat != NO_STARTAT)){ - pnet[i].enabled = now >= pnet[i].run + pnet[i].beats + pnet[i].startat; + if( pnet[i].enabled && (pnet[i].beats > 0 || pnet[i].run > 0)) { + pnet[i].enabled = now >= pnet[i].run + (pnet[i].beats > 0 ? pnet[i].beats : 0); #ifdef DEBUG_CQUERY_SCHEDULER fprintf(stderr,"#beat %s.%s "LLFMT"("LLFMT") %s\n", pnet[i].mod, pnet[i].fcn, - pnet[i].run + pnet[i].beats + pnet[i].startat, now, (pnet[i].enabled? "enabled":"disabled")); + pnet[i].run + (pnet[i].beats > 0 ? pnet[i].beats : 0), now, (pnet[i].enabled? "enabled":"disabled")); #endif } @@ -1276,7 +1293,7 @@ CQscheduler(void *dummy) msg= createException(MAL,"petrinet.scheduler","Can not fork the thread"); } else */ - if( pnet[i].cycles != NO_CYCLES && pnet[i].cycles > 0) + if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0) pnet[i].cycles--; pnet[i].run = now; /* last executed */ pnet[i].time = GDKusec() - t; /* keep around in microseconds */ diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h --- a/sql/backends/monet5/sql_cquery.h +++ b/sql/backends/monet5/sql_cquery.h @@ -54,10 +54,9 @@ typedef struct { int cycles; /* limit the number of invocations before dying */ lng beats; /* heart beat stride for procedures activations -> must be in microseconds */ - lng startat; /* start at the CQ at that precise moment (UNIX timestamp) -> must be in microseconds */ + lng run; /* start at the CQ at that precise moment (UNIX timestamp) -> must be in microseconds */ MT_Id tid; /* Thread responsible */ - lng run; /* last executed relative to start of server */ timestamp seen; str error; lng time; diff --git a/sql/backends/monet5/sql_execute.c b/sql/backends/monet5/sql_execute.c --- a/sql/backends/monet5/sql_execute.c +++ b/sql/backends/monet5/sql_execute.c @@ -333,6 +333,7 @@ SQLrun(Client c, backend *be, mvc *m) //set the cq parameters m->continuous = be->q->continuous; m->heartbeats = be->q->heartbeats; + m->startat_atom = be->q->startat_atom; m->cycles = be->q->cycles; break; } @@ -381,6 +382,7 @@ SQLrun(Client c, backend *be, mvc *m) } m->continuous = 0; m->heartbeats = DEFAULT_CP_HEARTBEAT; + m->startat_atom = NULL; m->cycles = DEFAULT_CP_CYCLES; // release the resources diff --git a/sql/backends/monet5/sql_scenario.c b/sql/backends/monet5/sql_scenario.c --- a/sql/backends/monet5/sql_scenario.c +++ b/sql/backends/monet5/sql_scenario.c @@ -1170,6 +1170,7 @@ SQLparser(Client c) escaped_q, m->continuous, m->heartbeats, + (AtomNode*) m->startat_atom, m->cycles); } GDKfree(q); diff --git a/sql/backends/monet5/sql_timestamps.c b/sql/backends/monet5/sql_timestamps.c new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/sql_timestamps.c @@ -0,0 +1,112 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V. + */ + +/* + * author: Pedro Ferreira, M. Kersten + * This module converts a MonetDB atom into a UNIX timestamp if the conversation is possible. An exception is thrown + * otherwise. + */ + +#include "sql_timestamps.h" +#include "sql.h" +#include "mtime.h" + +static int GetSQLTypeFromAtom(sql_subtype *sql_subtype) +{ + if (!sql_subtype) + return -1; + if (!sql_subtype->type) + return -1; + return sql_subtype->type->eclass; +} + +str +convert_atom_into_unix_timestamp(AtomNode *an, lng* res) +{ + atom *a = an->a; + str msg = MAL_SUCCEED; + *res = 0; + + if(a->isnull) { + throw(SQL,"sql.timestamp",SQLSTATE(42000) "The start at value cannot be null\n"); + } + switch (GetSQLTypeFromAtom(&a->tpe)) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list