Changeset: 2d92b97757bd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2d92b97757bd
Added Files:
        sql/backends/monet5/Tests/cfunction03.sql
Modified Files:
        sql/backends/monet5/Tests/All
        sql/backends/monet5/Tests/cfunction00.sql
        sql/backends/monet5/Tests/cfunction00.stable.out
        sql/backends/monet5/Tests/cfunction01.sql
        sql/backends/monet5/Tests/cfunction01.stable.out
        sql/backends/monet5/Tests/cfunction02.sql
        sql/backends/monet5/Tests/cfunction02.stable.out
        sql/backends/monet5/sql_basket.c
        sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:

Fix CQ exception handling


diffs (273 lines):

diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -115,6 +115,7 @@ cqstream03
 cfunction00
 cfunction01
 cfunction02
+cfunction03
 
 factory00
 factory01
diff --git a/sql/backends/monet5/Tests/cfunction00.sql 
b/sql/backends/monet5/Tests/cfunction00.sql
--- a/sql/backends/monet5/Tests/cfunction00.sql
+++ b/sql/backends/monet5/Tests/cfunction00.sql
@@ -10,7 +10,6 @@ begin
        end while;
        return s;
 END;
-select name from functions where name ='aggr00';
 
 -- to call a continuous function in the scheduler, we must pass the keyword 
"function" explicitly
 start continuous function aggr00();
diff --git a/sql/backends/monet5/Tests/cfunction00.stable.out 
b/sql/backends/monet5/Tests/cfunction00.stable.out
--- a/sql/backends/monet5/Tests/cfunction00.stable.out
+++ b/sql/backends/monet5/Tests/cfunction00.stable.out
@@ -37,12 +37,7 @@ Ready.
 #      end while;
 #      return s;
 #END;
-#select name from functions where name ='aggr00';
-% sys.functions # table_name
-% name # name
-% varchar # type
-% 6 # length
-[ "aggr00"     ]
+#start continuous function aggr00();
 #select aggr00(); #should return 1
 % .L2 # table_name
 % L2 # name
diff --git a/sql/backends/monet5/Tests/cfunction01.sql 
b/sql/backends/monet5/Tests/cfunction01.sql
--- a/sql/backends/monet5/Tests/cfunction01.sql
+++ b/sql/backends/monet5/Tests/cfunction01.sql
@@ -13,7 +13,6 @@ begin
        end while;
        return s;
 END;
-select * from functions where name ='aggr01';
 
 select result from tmp.aggr01; #error
 
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out 
b/sql/backends/monet5/Tests/cfunction01.stable.out
--- a/sql/backends/monet5/Tests/cfunction01.stable.out
+++ b/sql/backends/monet5/Tests/cfunction01.stable.out
@@ -79,12 +79,6 @@ Ready.
 #      end while;
 #      return s;
 #END;
-#select * from functions where name ='aggr01';
-% sys.functions,       sys.functions,  sys.functions,  sys.functions,  
sys.functions,  sys.functions,  sys.functions,  sys.functions,  sys.functions,  
sys.functions # table_name
-% id,  name,   func,   mod,    language,       type,   side_effect,    varres, 
vararg, schema_id # name
-% int, varchar,        varchar,        varchar,        int,    int,    
boolean,        boolean,        boolean,        int # type
-% 4,   6,      175,    4,      1,      1,      5,      5,      5,      4 # 
length
-[ 8727,        "aggr01",       "create function aggr01() \nreturns 
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s + 
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",   
"user", 2,      1,      false,  false,  false,  2000    ]
 #start continuous function aggr01();
 #select result from tmp.aggr01; #should be empty
 % tmp.aggr01 # table_name
diff --git a/sql/backends/monet5/Tests/cfunction02.sql 
b/sql/backends/monet5/Tests/cfunction02.sql
--- a/sql/backends/monet5/Tests/cfunction02.sql
+++ b/sql/backends/monet5/Tests/cfunction02.sql
@@ -1,7 +1,7 @@
 -- Test a continuous function returning a table
-create table results3 (aa int, bb text);
+create table results2 (aa int, bb text);
 
-create function cfunc3(input text) returns table (aa integer, bb text) begin
+create function cfunc2(input text) returns table (aa integer, bb text) begin
     declare s int;
     set s = 0;
     while true do
@@ -10,22 +10,22 @@ create function cfunc3(input text) retur
     end while;
 end;
 
-start continuous function cfunc3('test') with heartbeat 1000 cycles 3;
+start continuous function cfunc2('test') with heartbeat 1000 cycles 3;
 
-pause continuous cfunc3;
+pause continuous cfunc2;
 
-create procedure cproc3() begin
-    insert into results3 (select aa, bb from tmp.cfunc3);
+create procedure cproc2() begin
+    insert into results2 (select aa, bb from tmp.cfunc2);
 end;
 
-start continuous procedure cproc3() with cycles 3;
+start continuous procedure cproc2() with cycles 2;
 
 call cquery.wait(4000);
 
-stop continuous cfunc3;
+stop continuous cfunc2;
 
-select aa, bb from results3;
+select aa, bb from results2;
 
-drop function cfunc3;
-drop procedure cproc3;
-drop table results3;
+drop function cfunc2;
+drop procedure cproc2;
+drop table results2;
diff --git a/sql/backends/monet5/Tests/cfunction02.stable.out 
b/sql/backends/monet5/Tests/cfunction02.stable.out
--- a/sql/backends/monet5/Tests/cfunction02.stable.out
+++ b/sql/backends/monet5/Tests/cfunction02.stable.out
@@ -39,19 +39,18 @@ Ready.
 #create procedure cproc3() begin
 #    insert into results3 (select aa, bb from tmp.cfunc3);
 #end;
-#start continuous procedure cproc3() with cycles 3;
-#stop continuous cfunc3;
-#select aa, bb from results3;
-% sys.results3,        sys.results3 # table_name
+#start continuous procedure cproc2() with cycles 2;
+#stop continuous cfunc2;
+#select aa, bb from results2;
+% sys.results2,        sys.results2 # table_name
 % aa,  bb # name
 % int, clob # type
 % 1,   4 # length
 [ 1,   "test"  ]
 [ 1,   "test"  ]
-[ 1,   "test"  ]
-#drop function cfunc3;
-#drop procedure cproc3;
-#drop table results3;
+#drop function cfunc2;
+#drop procedure cproc2;
+#drop table results2;
 
 # 17:04:58 >  
 # 17:04:58 >  "Done."
diff --git a/sql/backends/monet5/Tests/cfunction03.sql 
b/sql/backends/monet5/Tests/cfunction03.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -0,0 +1,18 @@
+-- Test continuous functions with limited runs
+create table results3 (aa time);
+
+create function cfunc3(input time) returns table (aa time) begin
+    while true do
+        yield table (select input);
+    end while;
+end;
+
+start continuous function cfunc3(time '15:00:00') with heartbeat 100 cycles 3;
+
+call cquery.wait(2000);
+
+select aa from time;
+
+drop function cfunc3;
+drop procedure cproc3;
+drop table results3;
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -723,9 +723,11 @@ BSKTerror(Client cntxt, MalBlkPtr mb, Ma
        if( idx <= 0)
                throw(SQL,"basket.error",SQLSTATE(3F000) "Stream table %s.%s 
not registered\n",sname,tname);
 
-       baskets[idx].error = GDKstrdup(error);
-       if(baskets[idx].error == NULL)
-               throw(SQL,"basket.error",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+       if(error) {
+               baskets[idx].error = GDKstrdup(error);
+               if(baskets[idx].error == NULL)
+                       throw(SQL,"basket.error",SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
+       }
        return MAL_SUCCEED;
 }
 
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -60,7 +60,6 @@ static int pnstatus = CQINIT;
 static int cycleDelay = 200; /* be careful, it affects response/throughput 
timings */
 static MT_Lock ttrLock;
 static MT_Id cq_pid = 0;
-static int CQ_counter = 0;
 
 static BAT *CQ_id_tick = 0;
 static BAT *CQ_id_alias = 0;
@@ -401,8 +400,6 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
 }
 
 #define FREE_CQ_MB(X)    \
-       if(cq_id)            \
-               GDKfree(cq_id);  \
        if(mb)               \
                freeMalBlk(mb);  \
        if(ralias)           \
@@ -420,15 +417,13 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
 str
 CQregister(Client cntxt, str sname, str fname, int argc, atom **args, str 
alias, int which, lng heartbeats, lng startat, int cycles)
 {
-       str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL;
+       str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL, raliasdup = NULL;
        InstrPtr p = NULL, q = NULL;
        Symbol sym;
        CQnode *pnew;
        MalBlkPtr mb = NULL, prev;
        const char* err_message = (which & mod_continuous_function) ? 
"function" : "procedure";
-       char* cq_id = NULL;
-       char buffer[IDLENGTH];
-       int i, idx, varid, cid, freeMB = 0, mvc_var = 0;
+       int i, idx, varid, freeMB = 0, mvc_var = 0;
        backend* be = (backend*) cntxt->sqlcontext;
        mvc *m = be->mvc;
        sql_schema *s = NULL, *tmp_schema = NULL;
@@ -549,22 +544,20 @@ CQregister(Client cntxt, str sname, str 
                }
        }
 
-       MT_lock_set(&ttrLock);
-       cid = ++CQ_counter;
-       MT_lock_unset(&ttrLock);
-       (void) snprintf(buffer, sizeof(buffer), "cq_%d", cid); //set the CQ ID
-       if((cq_id = GDKstrdup(buffer)) == NULL) {
-               CQ_MALLOC_FAIL(finish)
-       }
        if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
                CQ_MALLOC_FAIL(finish)
        }
-       if((q = newInstruction(NULL, "user", cq_id)) == NULL) {
+       if((raliasdup = GDKstrdup(ralias)) == NULL) {
+               CQ_MALLOC_FAIL(finish)
+       }
+       if((q = newInstruction(NULL, "tmp", raliasdup)) == NULL) {
+               GDKfree(raliasdup);
                CQ_MALLOC_FAIL(finish)
        }
        q->token = FUNCTIONsymbol;
        q->barrier = 0;
-       if((varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_void)) < 0) {
+       if((varid = newVariable(mb, ralias, strlen(ralias), TYPE_void)) < 0) {
+               freeInstruction(q);
                CQ_MALLOC_FAIL(finish)
        }
        setDestVar(q, varid);
@@ -652,8 +645,8 @@ CQregister(Client cntxt, str sname, str 
                q->barrier = CATCHsymbol;
 
                q = newStmt(mb,basketRef, errorRef);
-               q = pushStr(mb, q, "user");
-               q = pushStr(mb, q, cq_id);
+               q = pushStr(mb, q, "tmp");
+               q = pushStr(mb, q, ralias);
                q = pushArgument(mb, q, except_var);
 
                q = newAssignment(mb);
@@ -666,8 +659,8 @@ CQregister(Client cntxt, str sname, str 
                q->barrier = CATCHsymbol;
 
                q = newStmt(mb,basketRef, errorRef);
-               q = pushStr(mb, q, "user");
-               q = pushStr(mb, q, cq_id);
+               q = pushStr(mb, q, "tmp");
+               q = pushStr(mb, q, ralias);
                q = pushArgument(mb, q, except_var);
 
                q = newAssignment(mb);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to