Changeset: 15783c6371a0 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=15783c6371a0 Added Files: sql/backends/monet5/Tests/cquery17.sql sql/backends/monet5/Tests/cquery17.stable.err sql/backends/monet5/Tests/cquery17.stable.out Modified Files: sql/backends/monet5/Tests/All sql/backends/monet5/sql_cquery.c sql/server/rel_psm.c sql/server/sql_parser.y Branch: trails Log Message:
Explicitly set the CQ scheduler threadID as exited so it can be restarted in other and handle better CQ parameters casting. diffs (truncated from 308 to 300 lines): diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All --- a/sql/backends/monet5/Tests/All +++ b/sql/backends/monet5/Tests/All @@ -102,6 +102,7 @@ cquery13 cquery14 cquery15 cquery16 +cquery17 cqstream00 cqstream01 diff --git a/sql/backends/monet5/Tests/cquery17.sql b/sql/backends/monet5/Tests/cquery17.sql new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/cquery17.sql @@ -0,0 +1,21 @@ +-- test multiple inserts from multiple SQL types +create table testing17(aaa integer, bbb real, ccc text, ddd date, eee time, fff timestamp); + +create procedure cq_query17(aaa integer, bbb real, ccc text, ddd date, eee time, fff timestamp) +begin + insert into testing17 values (aaa, bbb, ccc, ddd, eee, fff); +end; + +start continuous cq_query17(null, null, null, null, null, null) with heartbeat 1000 cycles 1; +call cquery.wait(1800); + +start continuous cq_query17('-2', 132, 'abc', date '2017-01-01', time '12:34:56', timestamp '2007-03-07 15:28:16.577') with heartbeat 1000 cycles 1; +call cquery.wait(1800); + +start continuous cq_query17(0, '-10.51', 'just another string', date '1980-12-13', time '17:35:58', timestamp '2009-02-09 15:00:00') with heartbeat 1000 cycles 1; +call cquery.wait(1800); + +select * from testing17; + +drop procedure cq_query17; +drop table testing17; diff --git a/sql/backends/monet5/Tests/cquery17.stable.err b/sql/backends/monet5/Tests/cquery17.stable.err new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/cquery17.stable.err @@ -0,0 +1,34 @@ +stderr of test 'cquery17` in directory 'sql/backends/monet5` itself: + + +# 13:04:59 > +# 13:04:59 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=31913" "--set" "mapi_usock=/var/tmp/mtest-23287/.s.monetdb.31913" "--set" "monet_prompt=" "--forcemito" "--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5" +# 13:04:59 > + +# builtin opt gdk_dbpath = /home/ferreira/MonetDB-trails/BUILD/var/monetdb5/dbfarm/demo +# builtin opt gdk_debug = 0 +# builtin opt gdk_vmtrim = no +# builtin opt monet_prompt = > +# builtin opt monet_daemon = no +# builtin opt mapi_port = 50000 +# builtin opt mapi_open = false +# builtin opt mapi_autosense = false +# builtin opt sql_optimizer = default_pipe +# builtin opt sql_debug = 0 +# cmdline opt gdk_nr_threads = 0 +# cmdline opt mapi_open = true +# cmdline opt mapi_port = 31913 +# cmdline opt mapi_usock = /var/tmp/mtest-23287/.s.monetdb.31913 +# cmdline opt monet_prompt = +# cmdline opt gdk_dbpath = /home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5 +# cmdline opt gdk_debug = 536870922 + +# 13:04:59 > +# 13:04:59 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" "--host=/var/tmp/mtest-23287" "--port=31913" +# 13:04:59 > + + +# 13:05:04 > +# 13:05:04 > "Done." +# 13:05:04 > + diff --git a/sql/backends/monet5/Tests/cquery17.stable.out b/sql/backends/monet5/Tests/cquery17.stable.out new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/cquery17.stable.out @@ -0,0 +1,50 @@ +stdout of test 'cquery17` in directory 'sql/backends/monet5` itself: + + +# 13:04:59 > +# 13:04:59 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=31913" "--set" "mapi_usock=/var/tmp/mtest-23287/.s.monetdb.31913" "--set" "monet_prompt=" "--forcemito" "--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5" +# 13:04:59 > + +# MonetDB 5 server v11.28.0 +# This is an unreleased version +# Serving database 'mTests_sql_backends_monet5', using 8 threads +# Compiled for x86_64-pc-linux-gnu/64bit with 128bit integers +# Found 15.498 GiB available main-memory. +# Copyright (c) 1993-July 2008 CWI. +# Copyright (c) August 2008-2017 MonetDB B.V., all rights reserved +# Visit https://www.monetdb.org/ for further information +# Listening for connection requests on mapi:monetdb://dhcp-120.eduroam.cwi.nl:31913/ +# Listening for UNIX domain connection requests on mapi:monetdb:///var/tmp/mtest-23287/.s.monetdb.31913 +# MonetDB/GIS module loaded +# MonetDB/SQL module loaded +# MonetDB/Timetrails module loaded + +Ready. + +# 13:04:59 > +# 13:04:59 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" "--host=/var/tmp/mtest-23287" "--port=31913" +# 13:04:59 > + +#create table testing17(aaa integer, bbb real, ccc text, ddd date, eee time, fff timestamp); +#create procedure cq_query17(aaa integer, bbb real, ccc text, ddd date, eee time, fff timestamp) +#begin +# insert into testing17 values (aaa, bbb, ccc, ddd, eee, fff); +#end; +#start continuous cq_query17(null, null, null, null, null, null) with heartbeat 1000 cycles 1; +#start continuous cq_query17('-2', 132, 'abc', date '2017-01-01', time '12:34:56', timestamp '2007-03-07 15:28:16.577') with heartbeat 1000 cycles 1; +#start continuous cq_query17(0, '-10.51', 'just another string', date '1980-12-13', time '17:35:58', timestamp '2009-02-09 15:00:00') with heartbeat 1000 cycles 1; +#select * from testing17; +% sys.testing17, sys.testing17, sys.testing17, sys.testing17, sys.testing17, sys.testing17 # table_name +% aaa, bbb, ccc, ddd, eee, fff # name +% int, real, clob, date, time, timestamp # type +% 2, 15, 19, 10, 8, 26 # length +[ NULL, NULL, NULL, NULL, NULL, NULL ] +[ -2, 132, "abc", 2017-01-01, 12:34:56, 2007-03-07 15:28:16.577000 ] +[ 0, -10.51, "just another string", 1980-12-13, 17:35:58, 2009-02-09 15:00:00.000000 ] +#drop procedure cq_query17; +#drop table testing17; + +# 13:05:04 > +# 13:05:04 > "Done." +# 13:05:04 > + diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -554,12 +554,25 @@ CQregister(Client cntxt, str sname, str sql_subtype tpe = ((sql_arg *) argn->data)->type; atom *a = args[i]; ValPtr val = (ValPtr) &a->data; - if(VALconvert(tpe.type->localtype, val) == NULL) { + ValRecord dst; + + dst.vtype = tpe.type->localtype; + /* use base tpe.type->localtype for user types */ + if (val->vtype > TYPE_str) + val->vtype = ATOMstorage(val->vtype); + if (dst.vtype > TYPE_str) + dst.vtype = ATOMstorage(dst.vtype); + /* first convert into a new location */ + if (VARconvert(&dst, val, 0) != GDK_SUCCEED) { msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Error while making a conversion\n"); GDKfree(ralias); freeMalBlk(mb); goto finish; } + /* and finally copy the result */ + *val = dst; + /* make sure we return the correct type (not the storage type) */ + val->vtype = tpe.type->localtype; if((p = pushValue(mb, p, val)) == NULL) { FREE_CQ_MB(finish) } @@ -1315,9 +1328,10 @@ CQscheduler(void *dummy) fprintf(stderr, "#cquery.scheduler stopped\n"); #endif pnstatus = CQINIT; - MT_lock_unset(&ttrLock); + cq_pid = 0; SQLexitClient(cntxt); MCcloseClient(cntxt, CQ_CLIENT); + MT_lock_unset(&ttrLock); } str diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c --- a/sql/server/rel_psm.c +++ b/sql/server/rel_psm.c @@ -448,7 +448,7 @@ rel_psm_return( mvc *sql, sql_subtype *r int is_last = 0; list *l = sa_list(sql->sa); char* psm_Err = token == SQL_RETURN ? "RETURN" : "YIELD"; - char* psm_err = token == SQL_RETURN ? "return" : "yield"; + char* psm_err = token == SQL_RETURN ? "return" : "yield"; if (restypelist) ek.card = card_relation; @@ -603,12 +603,12 @@ rel_start_continuous_query(mvc *sql, dno lng start_at = 0; str msg = NULL; list *cq_parameters = new_exp_list(sql->sa); - symbol *sym = w->next->next->data.sym; + symbol *sym = w->next->data.sym; dnode *l = sym->data.lval->h; char *sname = qname_schema(l->data.lval); char *fname = qname_fname(l->data.lval); - an = (AtomNode*) w->next->next->next->next->data.sym; + an = (AtomNode*) w->next->next->next->data.sym; if(an && (msg = convert_atom_into_unix_timestamp(an->a, &start_at)) != NULL){ return sql_error(sql, 01, "%s", msg); } @@ -616,11 +616,11 @@ rel_start_continuous_query(mvc *sql, dno append(cq_parameters, exp_atom_clob(sql->sa, fname)); //function name append(cq_parameters, exp_atom_int(sql->sa, sql->argc)); //args append(cq_parameters, exp_atom_ptr(sql->sa, sql->args)); //argv - append(cq_parameters, exp_atom_clob(sql->sa, w->next->next->next->next->next->next->data.sval)); //alias - append(cq_parameters, exp_atom_int(sql->sa, w->data.i_val)); //start query - append(cq_parameters, exp_atom_lng(sql->sa, w->next->next->next->data.l_val)); //heartbeats + append(cq_parameters, exp_atom_clob(sql->sa, w->next->next->next->next->next->data.sval)); //alias + append(cq_parameters, exp_atom_int(sql->sa, w->data.i_val)); //start query, procedure or function? + append(cq_parameters, exp_atom_lng(sql->sa, w->next->next->data.l_val)); //heartbeats append(cq_parameters, exp_atom_lng(sql->sa, start_at)); //start at value - append(cq_parameters, exp_atom_int(sql->sa, w->next->next->next->next->next->data.i_val)); //cycles + append(cq_parameters, exp_atom_int(sql->sa, w->next->next->next->next->data.i_val)); //cycles rel = rel_create(sql->sa); rel->l = NULL; @@ -637,16 +637,13 @@ static sql_rel * rel_single_continuous_query(mvc *sql, dnode *w) { sql_rel *rel; list *exps; - int action = 0; + int action = w->data.i_val; //pause, resume or stop query? procedure or function? AtomNode* an; lng start_at_parsed = 0; str msg = NULL; - action |= w->data.i_val; /* pause, resume or stop query? */ - action |= w->next->data.i_val; /* procedure or function? */ - if(action & mod_resume_continuous) { - an = (AtomNode*) w->next->next->next->next->data.sym; + an = (AtomNode*) w->next->next->next->data.sym; if(an){ if((msg = convert_atom_into_unix_timestamp(an->a, &start_at_parsed)) != NULL) return sql_error(sql, 02, "%s", msg); @@ -656,12 +653,12 @@ rel_single_continuous_query(mvc *sql, dn rel = rel_create(sql->sa); exps = new_exp_list(sql->sa); - append(exps, exp_atom_clob(sql->sa, w->next->next->data.sval)); //alias + append(exps, exp_atom_clob(sql->sa, w->next->data.sval)); //alias append(exps, exp_atom_int(sql->sa, action)); if(action & mod_resume_continuous) { - append(exps, exp_atom_lng(sql->sa, w->next->next->next->data.l_val)); //heartbeats + append(exps, exp_atom_lng(sql->sa, w->next->next->data.l_val)); //heartbeats append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start at value - append(exps, exp_atom_int(sql->sa, w->next->next->next->next->next->data.i_val)); //cycles + append(exps, exp_atom_int(sql->sa, w->next->next->next->next->data.i_val)); //cycles } else { append(exps, exp_atom_lng(sql->sa, 0)); append(exps, exp_atom_lng(sql->sa, 0)); diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y --- a/sql/server/sql_parser.y +++ b/sql/server/sql_parser.y @@ -2177,8 +2177,7 @@ cq_alias: continuous_query_statement: START_CONTINUOUS database_object func_ref WITH heartbeat_set begin_at_set cycles_set cq_alias { dlist *l = L(); - append_int( l, mod_start_continuous); - append_int( l, $2); + append_int( l, mod_start_continuous | $2); append_symbol( l, $3); append_lng( l, $5); append_symbol( l, $6); @@ -2187,8 +2186,7 @@ continuous_query_statement: $$ = _symbol_create_list( SQL_START_CONTINUOUS_QUERY, l ); } | START_CONTINUOUS database_object func_ref cq_alias { dlist *l = L(); - append_int( l, mod_start_continuous); - append_int( l, $2); + append_int( l, mod_start_continuous | $2); append_symbol( l, $3); append_lng( l, DEFAULT_CP_HEARTBEAT); append_symbol( l, NULL); @@ -2197,20 +2195,17 @@ continuous_query_statement: $$ = _symbol_create_list( SQL_START_CONTINUOUS_QUERY, l ); } | STOP_CONTINUOUS database_object ident { dlist *l = L(); - append_int( l, mod_stop_continuous); - append_int( l, $2); + append_int( l, mod_stop_continuous | $2); append_string( l, $3); $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); } | PAUSE_CONTINUOUS database_object ident { dlist *l = L(); - append_int( l, mod_pause_continuous); - append_int( l, $2); + append_int( l, mod_pause_continuous | $2); append_string( l, $3); $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); } | RESUME_CONTINUOUS database_object ident WITH heartbeat_set begin_at_set cycles_set { dlist *l = L(); - append_int( l, mod_resume_continuous); - append_int( l, $2); + append_int( l, mod_resume_continuous | $2); append_string( l, $3); append_lng( l, $5); append_symbol( l, $6); @@ -2218,8 +2213,7 @@ continuous_query_statement: $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list