Changeset: 15783c6371a0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=15783c6371a0
Added Files:
        sql/backends/monet5/Tests/cquery17.sql
        sql/backends/monet5/Tests/cquery17.stable.err
        sql/backends/monet5/Tests/cquery17.stable.out
Modified Files:
        sql/backends/monet5/Tests/All
        sql/backends/monet5/sql_cquery.c
        sql/server/rel_psm.c
        sql/server/sql_parser.y
Branch: trails
Log Message:

Explicitly set the CQ scheduler threadID as exited so it can be restarted in 
other and handle better CQ parameters casting.


diffs (truncated from 308 to 300 lines):

diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -102,6 +102,7 @@ cquery13
 cquery14
 cquery15
 cquery16
+cquery17
 
 cqstream00
 cqstream01
diff --git a/sql/backends/monet5/Tests/cquery17.sql 
b/sql/backends/monet5/Tests/cquery17.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cquery17.sql
@@ -0,0 +1,21 @@
+-- test multiple inserts from multiple SQL types
+create table testing17(aaa integer, bbb real, ccc text, ddd date, eee time, 
fff timestamp);
+
+create procedure cq_query17(aaa integer, bbb real, ccc text, ddd date, eee 
time, fff timestamp)
+begin
+       insert into testing17 values (aaa, bbb, ccc, ddd, eee, fff);
+end;
+
+start continuous cq_query17(null, null, null, null, null, null) with heartbeat 
1000 cycles 1;
+call cquery.wait(1800);
+
+start continuous cq_query17('-2', 132, 'abc', date '2017-01-01', time 
'12:34:56', timestamp '2007-03-07 15:28:16.577') with heartbeat 1000 cycles 1;
+call cquery.wait(1800);
+
+start continuous cq_query17(0, '-10.51', 'just another string', date 
'1980-12-13', time '17:35:58', timestamp '2009-02-09 15:00:00') with heartbeat 
1000 cycles 1;
+call cquery.wait(1800);
+
+select * from testing17;
+
+drop procedure cq_query17;
+drop table testing17;
diff --git a/sql/backends/monet5/Tests/cquery17.stable.err 
b/sql/backends/monet5/Tests/cquery17.stable.err
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cquery17.stable.err
@@ -0,0 +1,34 @@
+stderr of test 'cquery17` in directory 'sql/backends/monet5` itself:
+
+
+# 13:04:59 >  
+# 13:04:59 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=31913" "--set" 
"mapi_usock=/var/tmp/mtest-23287/.s.monetdb.31913" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5"
+# 13:04:59 >  
+
+# builtin opt  gdk_dbpath = 
/home/ferreira/MonetDB-trails/BUILD/var/monetdb5/dbfarm/demo
+# builtin opt  gdk_debug = 0
+# builtin opt  gdk_vmtrim = no
+# builtin opt  monet_prompt = >
+# builtin opt  monet_daemon = no
+# builtin opt  mapi_port = 50000
+# builtin opt  mapi_open = false
+# builtin opt  mapi_autosense = false
+# builtin opt  sql_optimizer = default_pipe
+# builtin opt  sql_debug = 0
+# cmdline opt  gdk_nr_threads = 0
+# cmdline opt  mapi_open = true
+# cmdline opt  mapi_port = 31913
+# cmdline opt  mapi_usock = /var/tmp/mtest-23287/.s.monetdb.31913
+# cmdline opt  monet_prompt = 
+# cmdline opt  gdk_dbpath = 
/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5
+# cmdline opt  gdk_debug = 536870922
+
+# 13:04:59 >  
+# 13:04:59 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-23287" "--port=31913"
+# 13:04:59 >  
+
+
+# 13:05:04 >  
+# 13:05:04 >  "Done."
+# 13:05:04 >  
+
diff --git a/sql/backends/monet5/Tests/cquery17.stable.out 
b/sql/backends/monet5/Tests/cquery17.stable.out
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cquery17.stable.out
@@ -0,0 +1,50 @@
+stdout of test 'cquery17` in directory 'sql/backends/monet5` itself:
+
+
+# 13:04:59 >  
+# 13:04:59 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=31913" "--set" 
"mapi_usock=/var/tmp/mtest-23287/.s.monetdb.31913" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5"
+# 13:04:59 >  
+
+# MonetDB 5 server v11.28.0
+# This is an unreleased version
+# Serving database 'mTests_sql_backends_monet5', using 8 threads
+# Compiled for x86_64-pc-linux-gnu/64bit with 128bit integers
+# Found 15.498 GiB available main-memory.
+# Copyright (c) 1993-July 2008 CWI.
+# Copyright (c) August 2008-2017 MonetDB B.V., all rights reserved
+# Visit https://www.monetdb.org/ for further information
+# Listening for connection requests on 
mapi:monetdb://dhcp-120.eduroam.cwi.nl:31913/
+# Listening for UNIX domain connection requests on 
mapi:monetdb:///var/tmp/mtest-23287/.s.monetdb.31913
+# MonetDB/GIS module loaded
+# MonetDB/SQL module loaded
+# MonetDB/Timetrails module loaded
+
+Ready.
+
+# 13:04:59 >  
+# 13:04:59 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-23287" "--port=31913"
+# 13:04:59 >  
+
+#create table testing17(aaa integer, bbb real, ccc text, ddd date, eee time, 
fff timestamp);
+#create procedure cq_query17(aaa integer, bbb real, ccc text, ddd date, eee 
time, fff timestamp)
+#begin
+#      insert into testing17 values (aaa, bbb, ccc, ddd, eee, fff);
+#end;
+#start continuous cq_query17(null, null, null, null, null, null) with 
heartbeat 1000 cycles 1;
+#start continuous cq_query17('-2', 132, 'abc', date '2017-01-01', time 
'12:34:56', timestamp '2007-03-07 15:28:16.577') with heartbeat 1000 cycles 1;
+#start continuous cq_query17(0, '-10.51', 'just another string', date 
'1980-12-13', time '17:35:58', timestamp '2009-02-09 15:00:00') with heartbeat 
1000 cycles 1;
+#select * from testing17;
+% sys.testing17,       sys.testing17,  sys.testing17,  sys.testing17,  
sys.testing17,  sys.testing17 # table_name
+% aaa, bbb,    ccc,    ddd,    eee,    fff # name
+% int, real,   clob,   date,   time,   timestamp # type
+% 2,   15,     19,     10,     8,      26 # length
+[ NULL,        NULL,   NULL,   NULL,   NULL,   NULL    ]
+[ -2,  132,    "abc",  2017-01-01,     12:34:56,       2007-03-07 
15:28:16.577000      ]
+[ 0,   -10.51, "just another string",  1980-12-13,     17:35:58,       
2009-02-09 15:00:00.000000      ]
+#drop procedure cq_query17;
+#drop table testing17;
+
+# 13:05:04 >  
+# 13:05:04 >  "Done."
+# 13:05:04 >  
+
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -554,12 +554,25 @@ CQregister(Client cntxt, str sname, str 
                sql_subtype tpe = ((sql_arg *) argn->data)->type;
                atom *a = args[i];
                ValPtr val = (ValPtr) &a->data;
-               if(VALconvert(tpe.type->localtype, val) == NULL) {
+               ValRecord dst;
+
+               dst.vtype = tpe.type->localtype;
+               /* use base tpe.type->localtype for user types */
+               if (val->vtype > TYPE_str)
+                       val->vtype = ATOMstorage(val->vtype);
+               if (dst.vtype > TYPE_str)
+                       dst.vtype = ATOMstorage(dst.vtype);
+               /* first convert into a new location */
+               if (VARconvert(&dst, val, 0) != GDK_SUCCEED) {
                        msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Error while making a 
conversion\n");
                        GDKfree(ralias);
                        freeMalBlk(mb);
                        goto finish;
                }
+               /* and finally copy the result */
+               *val = dst;
+               /* make sure we return the correct type (not the storage type) 
*/
+               val->vtype = tpe.type->localtype;
                if((p = pushValue(mb, p, val)) == NULL) {
                        FREE_CQ_MB(finish)
                }
@@ -1315,9 +1328,10 @@ CQscheduler(void *dummy)
        fprintf(stderr, "#cquery.scheduler stopped\n");
 #endif
        pnstatus = CQINIT;
-       MT_lock_unset(&ttrLock);
+       cq_pid = 0;
        SQLexitClient(cntxt);
        MCcloseClient(cntxt, CQ_CLIENT);
+       MT_lock_unset(&ttrLock);
 }
 
 str
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -448,7 +448,7 @@ rel_psm_return( mvc *sql, sql_subtype *r
        int is_last = 0;
        list *l = sa_list(sql->sa);
        char* psm_Err = token == SQL_RETURN ? "RETURN" : "YIELD";
-    char* psm_err = token == SQL_RETURN ? "return" : "yield";
+       char* psm_err = token == SQL_RETURN ? "return" : "yield";
 
        if (restypelist)
                ek.card = card_relation;
@@ -603,12 +603,12 @@ rel_start_continuous_query(mvc *sql, dno
        lng start_at = 0;
        str msg = NULL;
        list *cq_parameters = new_exp_list(sql->sa);
-       symbol *sym = w->next->next->data.sym;
+       symbol *sym = w->next->data.sym;
        dnode *l = sym->data.lval->h;
        char *sname = qname_schema(l->data.lval);
        char *fname = qname_fname(l->data.lval);
 
-       an = (AtomNode*) w->next->next->next->next->data.sym;
+       an = (AtomNode*) w->next->next->next->data.sym;
        if(an && (msg = convert_atom_into_unix_timestamp(an->a, &start_at)) != 
NULL){
                return sql_error(sql, 01, "%s", msg);
        }
@@ -616,11 +616,11 @@ rel_start_continuous_query(mvc *sql, dno
        append(cq_parameters, exp_atom_clob(sql->sa, fname)); //function name
        append(cq_parameters, exp_atom_int(sql->sa, sql->argc)); //args
        append(cq_parameters, exp_atom_ptr(sql->sa, sql->args)); //argv
-       append(cq_parameters, exp_atom_clob(sql->sa, 
w->next->next->next->next->next->next->data.sval)); //alias
-       append(cq_parameters, exp_atom_int(sql->sa, w->data.i_val)); //start 
query
-       append(cq_parameters, exp_atom_lng(sql->sa, 
w->next->next->next->data.l_val)); //heartbeats
+       append(cq_parameters, exp_atom_clob(sql->sa, 
w->next->next->next->next->next->data.sval)); //alias
+       append(cq_parameters, exp_atom_int(sql->sa, w->data.i_val)); //start 
query, procedure or function?
+       append(cq_parameters, exp_atom_lng(sql->sa, 
w->next->next->data.l_val)); //heartbeats
        append(cq_parameters, exp_atom_lng(sql->sa, start_at)); //start at value
-       append(cq_parameters, exp_atom_int(sql->sa, 
w->next->next->next->next->next->data.i_val)); //cycles
+       append(cq_parameters, exp_atom_int(sql->sa, 
w->next->next->next->next->data.i_val)); //cycles
 
        rel = rel_create(sql->sa);
        rel->l = NULL;
@@ -637,16 +637,13 @@ static sql_rel *
 rel_single_continuous_query(mvc *sql, dnode *w) {
        sql_rel *rel;
        list *exps;
-       int action = 0;
+       int action = w->data.i_val; //pause, resume or stop query? procedure or 
function?
        AtomNode* an;
        lng start_at_parsed = 0;
        str msg = NULL;
 
-       action |= w->data.i_val; /* pause, resume or stop query? */
-       action |= w->next->data.i_val; /* procedure or function? */
-
        if(action & mod_resume_continuous) {
-               an = (AtomNode*) w->next->next->next->next->data.sym;
+               an = (AtomNode*) w->next->next->next->data.sym;
                if(an){
                        if((msg = convert_atom_into_unix_timestamp(an->a, 
&start_at_parsed)) != NULL)
                                return sql_error(sql, 02, "%s", msg);
@@ -656,12 +653,12 @@ rel_single_continuous_query(mvc *sql, dn
        rel = rel_create(sql->sa);
        exps = new_exp_list(sql->sa);
 
-       append(exps, exp_atom_clob(sql->sa, w->next->next->data.sval)); //alias
+       append(exps, exp_atom_clob(sql->sa, w->next->data.sval)); //alias
        append(exps, exp_atom_int(sql->sa, action));
        if(action & mod_resume_continuous) {
-               append(exps, exp_atom_lng(sql->sa, 
w->next->next->next->data.l_val)); //heartbeats
+               append(exps, exp_atom_lng(sql->sa, w->next->next->data.l_val)); 
//heartbeats
                append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start 
at value
-               append(exps, exp_atom_int(sql->sa, 
w->next->next->next->next->next->data.i_val)); //cycles
+               append(exps, exp_atom_int(sql->sa, 
w->next->next->next->next->data.i_val)); //cycles
        } else {
                append(exps, exp_atom_lng(sql->sa, 0));
                append(exps, exp_atom_lng(sql->sa, 0));
diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y
--- a/sql/server/sql_parser.y
+++ b/sql/server/sql_parser.y
@@ -2177,8 +2177,7 @@ cq_alias:
 continuous_query_statement:
        START_CONTINUOUS database_object func_ref WITH heartbeat_set 
begin_at_set cycles_set cq_alias
                { dlist *l = L();
-                 append_int( l, mod_start_continuous);
-                 append_int( l, $2);
+                 append_int( l, mod_start_continuous | $2);
                  append_symbol( l, $3);
                  append_lng( l, $5);
                  append_symbol( l, $6);
@@ -2187,8 +2186,7 @@ continuous_query_statement:
                  $$ = _symbol_create_list( SQL_START_CONTINUOUS_QUERY, l ); }
        | START_CONTINUOUS database_object func_ref cq_alias
                { dlist *l = L();
-                 append_int( l, mod_start_continuous);
-                 append_int( l, $2);
+                 append_int( l, mod_start_continuous | $2);
                  append_symbol( l, $3);
                  append_lng( l, DEFAULT_CP_HEARTBEAT);
                  append_symbol( l, NULL);
@@ -2197,20 +2195,17 @@ continuous_query_statement:
                  $$ = _symbol_create_list( SQL_START_CONTINUOUS_QUERY, l ); }
        | STOP_CONTINUOUS database_object ident
                { dlist *l = L();
-                 append_int( l, mod_stop_continuous);
-                 append_int( l, $2);
+                 append_int( l, mod_stop_continuous | $2);
                  append_string( l, $3);
                  $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); }
        | PAUSE_CONTINUOUS database_object ident
                { dlist *l = L();
-                 append_int( l, mod_pause_continuous);
-                 append_int( l, $2);
+                 append_int( l, mod_pause_continuous | $2);
                  append_string( l, $3);
                  $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); }
        | RESUME_CONTINUOUS database_object ident WITH heartbeat_set 
begin_at_set cycles_set
                { dlist *l = L();
-                 append_int( l, mod_resume_continuous);
-                 append_int( l, $2);
+                 append_int( l, mod_resume_continuous | $2);
                  append_string( l, $3);
                  append_lng( l, $5);
                  append_symbol( l, $6);
@@ -2218,8 +2213,7 @@ continuous_query_statement:
                  $$ = _symbol_create_list( SQL_CHANGE_CONTINUOUS_QUERY, l ); }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to