Changeset: 8204adf3f30e for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8204adf3f30e
Modified Files:
        sql/backends/monet5/Tests/cquery16.sql
        sql/backends/monet5/Tests/cquery16.stable.err
        sql/backends/monet5/sql_cat.c
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sql_statement.c
        sql/backends/monet5/sqlcatalog.mal
        sql/include/sql_catalog.h
        sql/include/sql_relation.h
        sql/server/rel_psm.c
Branch: trails
Log Message:

Compile the UDF and setup parameters just before registering in the Petri-net.


diffs (truncated from 685 to 300 lines):

diff --git a/sql/backends/monet5/Tests/cquery16.sql 
b/sql/backends/monet5/Tests/cquery16.sql
--- a/sql/backends/monet5/Tests/cquery16.sql
+++ b/sql/backends/monet5/Tests/cquery16.sql
@@ -4,11 +4,6 @@ create table results16 (a int);
 
 start continuous sys.cq_query16a(); --error
 
-create procedure cq_query16b() --error
-begin
-       start continuous sys.cq_query16a();
-end;
-
 create procedure cq_query16a()
 begin
        insert into results16 (select * from testing16);
diff --git a/sql/backends/monet5/Tests/cquery16.stable.err 
b/sql/backends/monet5/Tests/cquery16.stable.err
--- a/sql/backends/monet5/Tests/cquery16.stable.err
+++ b/sql/backends/monet5/Tests/cquery16.stable.err
@@ -31,15 +31,8 @@ stderr of test 'cquery16` in directory '
 
 MAPI  = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794
 QUERY = start continuous sys.cq_query16a(); --error
-ERROR = !SELECT: no such operator 'cq_query16a'
-CODE  = 42000
-MAPI  = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794
-QUERY = create procedure cq_query16b() --error
-        begin
-               start continuous sys.cq_query16a();
-        end;
-ERROR = !SELECT: no such operator 'cq_query16a'
-CODE  = 42000
+ERROR = !Failed to bind procedure sys.cq_query16a
+CODE  = 3F000
 
 # 15:37:38 >  
 # 15:37:38 >  "Done."
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -427,12 +427,6 @@ drop_index(Client cntxt, mvc *sql, char 
 }
 
 static str
-start_cp(Client cntxt, str alias, int action, lng heartbeat, lng startat, int 
cycles, MalBlkPtr fcall)
-{
-       return CQregister(cntxt, alias, action, heartbeat, startat, cycles, 
fcall);
-}
-
-static str
 change_single_cp(str alias, int action, lng heartbeat, lng startat, int cycles)
 {
        if(action & mod_resume_continuous) {
@@ -1312,15 +1306,18 @@ str
 SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      mvc *sql = NULL;
        str msg;
-       str alias = *getArgReference_str(stk, pci, 1);
-       int action = *getArgReference_int(stk, pci, 2);
-       lng heartbeat = *getArgReference_lng(stk, pci, 3);
-       lng startat = *getArgReference_lng(stk, pci, 4);
-       int cycles = *getArgReference_int(stk, pci, 5);
-       MalBlkPtr fcall = *(MalBlkPtr*) getArgReference(stk, pci, 6);
+       str sname = *getArgReference_str(stk, pci, 1);
+       str fname = *getArgReference_str(stk, pci, 2);
+       int argc = *getArgReference_int(stk, pci, 3);
+       atom **args = *(atom ***) getArgReference(stk, pci, 4);
+       str alias = *getArgReference_str(stk, pci, 5);
+       int action = *getArgReference_int(stk, pci, 6);
+       lng heartbeat = *getArgReference_lng(stk, pci, 7);
+       lng startat = *getArgReference_lng(stk, pci, 8);
+       int cycles = *getArgReference_int(stk, pci, 9);
 
        initcontext();
-       msg = start_cp(cntxt, alias, action, heartbeat, startat, cycles, fcall);
+       msg = CQregister(cntxt, sname, fname, argc, args, alias, action, 
heartbeat, startat, cycles);
        return msg;
 }
 
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -58,6 +58,7 @@ static int pnstatus = CQINIT;
 static int cycleDelay = 200; /* be careful, it affects response/throughput 
timings */
 static MT_Lock ttrLock;
 static MT_Id cq_pid = 0;
+static int CQ_counter = 0;
 
 static BAT *CQ_id_tick = 0;
 static BAT *CQ_id_mod = 0;
@@ -429,19 +430,37 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
        return msg;
 }
 
+#define FREE_CQ_MB(X)                                                          
   \
+       msg = createException(SQL,"cquery.register",SQLSTATE(HY001) 
MAL_MALLOC_FAIL); \
+       if(mb)                                                                  
      \
+               freeMalBlk(mb);                                                 
          \
+       if(ralias)                                                              
      \
+               GDKfree(ralias);                                                
          \
+       goto X;
+
 /* Every SQL statement is wrapped with a caller function that
  * regulates transaction bounds, debugger
  * The actual function is called with the arguments provided in the call.
  */
 str
-CQregister(Client cntxt, str alias, int which, lng heartbeats, lng startat, 
int cycles, MalBlkPtr mb)
+CQregister(Client cntxt, str sname, str fname, int argc, atom **args, str 
alias, int which, lng heartbeats, lng startat, int cycles)
 {
-       str msg = MAL_SUCCEED;
-       InstrPtr sig = NULL, q;
-       Symbol s;
+       str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL;
+       InstrPtr p = NULL, q = NULL;
+       Symbol sym;
        CQnode *pnew;
+       MalBlkPtr mb = NULL, prev;
        const char* err_message = (which & mod_continuous_function) ? 
"function" : "procedure";
-       int i, j, idx;
+       char* cq_id = NULL;
+       int i, idx, varid;
+       char buffer[IDLENGTH];
+       backend* be = (backend*) cntxt->sqlcontext;
+       mvc *m = be->mvc;
+       sql_schema *s;
+       sql_subfunc *f;
+       list *l;
+
+       prev = be->mb;
 
        if(cycles < 0 && cycles != CYCLES_NIL){
                msg = createException(SQL,"cquery.register",SQLSTATE(42000) 
"The cycles value must be non negative\n");
@@ -456,34 +475,132 @@ CQregister(Client cntxt, str alias, int 
                goto finish;
        }
 
-       if(which & mod_continuous_function){ /* for functions we need to remove 
the sql.mvc instruction */
-               for(i = 1; i< mb->stop; i++){
-                       sig= getInstrPtr(mb,i);
-                       if( getFunctionId(sig) == mvcRef){
-                               removeInstruction(mb, sig);
-                       }
+       if (!m->sa) {
+               if((m->sa = sa_create()) == NULL) {
+                       FREE_CQ_MB(finish)
+               }
+       }
+       if (!be->mb) {
+               if((be->mb = newMalBlk(8)) == NULL) {
+                       FREE_CQ_MB(finish)
                }
        }
 
-       /* extract the actual procedure/function call and check for duplicate */
-       for(i = 1; i< mb->stop; i++){
-               sig= getInstrPtr(mb,i);
-               if( getModuleId(sig) == userRef)
-                       break;
-       }
-       if( i == mb->stop){
-               msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Cannot detect %s call %s.%s.\n",
-                                                         err_message, 
getModuleId(sig), getFunctionId(sig));
+       rschema = (sname == NULL || strcmp(sname, str_nil) == 0) ? 
m->session->schema_name : sname;
+       if((s = mvc_bind_schema(m, rschema)) == NULL) { //bind the schema
+               msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Failed to bind schema %s\n", rschema);
                goto finish;
        }
 
-       if(!alias || strcmp(alias, str_nil) == 0) {
-               alias = GDKstrdup(getFunctionId(sig));
+       if((l = list_create(NULL)) == NULL) {
+               FREE_CQ_MB(finish)
+       }
+       for (i = 0; i < argc; i++) { //prepare the arguments for the backend 
creation
+               atom *a = args[i];
+               list_append(l, stmt_varnr(be, i, &a->tpe));
+       }
+       if(argc)
+               f = sql_find_func(m->sa, s, fname, argc, (which & 
mod_continuous_function) ? F_FUNC : F_PROC, NULL); //bind the UDF
+       else
+               f = sql_bind_func_(m->sa, s, fname, l, (which & 
mod_continuous_function) ? F_FUNC : F_PROC);
+       if(!f) {
+               msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Failed to bind %s %s.%s\n", err_message, sname, fname);
+               GDKfree(ralias);
+               list_destroy(l);
+               goto finish;
+       }
+       if (backend_create_subfunc(be, f, l) < 0) { //create the backend 
function
+               msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Failed to generate backend function\n");
+               GDKfree(ralias);
+               list_destroy(l);
+               goto finish;
+       }
+       list_destroy(l);
+
+       (void) snprintf(buffer, sizeof(buffer), "cq_%d", ++CQ_counter); //set 
the CQ ID
+       if((cq_id = GDKstrdup(buffer)) == NULL) {
+               FREE_CQ_MB(finish)
+       }
+       if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
+               GDKfree(cq_id);
+               FREE_CQ_MB(finish)
+       }
+       if((p = newInstruction(NULL, "user", cq_id)) == NULL) {
+               GDKfree(cq_id);
+               FREE_CQ_MB(finish)
+       }
+       p->token = FUNCTIONsymbol;
+       p->barrier = 0;
+       varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_any);
+       setDestVar(p, varid);
+       pushInstruction(mb, p);
+
+       for (i = 0; i < argc; i++) { //add variables to the MAL block
+               atom *a = args[i];
+               int type = atom_type(a)->type->localtype;
+               varid = 0;
+
+               (void) snprintf(buffer, sizeof(buffer), "A%d", i);
+               a->varid = varid = newVariable(mb, buffer, strlen(buffer), 
type);
+               if (varid < 0) {
+                       FREE_CQ_MB(finish)
+               }
+               if ((p = pushArgument(mb, p, varid)) == NULL) {
+                       FREE_CQ_MB(finish)
+               }
+               setVarType(mb, varid, type);
+               setVarUDFtype(mb, 0);
+       }
+       for (i = 0; i < argc; i++) { //add assignments for arguments
+               p = newAssignment(mb);
+               if (p && args[i]->varid >= 0) {
+                       p = pushArgument(mb, p, args[i]->varid);
+               } else if(p) {
+                       (void) snprintf(buffer, sizeof(buffer), "A%d", i);
+                       p = pushArgumentId(mb, p, buffer);
+               }
+               if (p == NULL) {
+                       FREE_CQ_MB(finish)
+               }
+       }
+       if ((p = newStmt(mb, "user", fname)) == NULL) { //add the UDF call 
statement
+               FREE_CQ_MB(finish)
+       }
+       /*if (f->res && list_length(f->res)) {
+               sql_subtype *res = f->res->h->data;
+               setVarType(mb, getArg(q, 0), res->type->localtype);
+               setVarUDFtype(mb, getArg(q, 0));
+       }*/
+       for (i = 0; i < argc; i++) { //add arguments assignments
+               if ((p = pushArgument(mb, p, i + 2)) == NULL) {
+                       FREE_CQ_MB(finish)
+               }
+       }
+       for (i = 0; i < argc; i++) { //initialize arguments assignments
+               atom *arg = args[i];
+               ValPtr val = (ValPtr) &arg->data;
+               if (VALcopy(&mb->var[i + 2].value, val) == NULL) {
+                       FREE_CQ_MB(finish)
+               }
+               setVarConstant(mb, i + 2);
+               setVarFixed(mb, i + 2);
+       }
+
+       if(!alias || strcmp(alias, str_nil) == 0) { //set the alias
+               if((ralias = GDKstrdup(fname)) == NULL) {
+                       FREE_CQ_MB(finish)
+               }
        } else {
-               alias = GDKstrdup(alias);
+        ralias = GDKstrdup(alias);
+       }
+       if(ralias == NULL) {
+               FREE_CQ_MB(finish)
        }
-       if( alias == NULL) {
-               msg = createException(SQL,"cquery.register",SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
+       if ((sym = findSymbol(cntxt->usermodule, "user", fname)) == NULL){ // 
access the actual procedure body
+               msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Cannot find %s user.%s.\n", err_message, fname);
+               GDKfree(cq_id);
+               GDKfree(ralias);
+               freeMalBlk(mb);
                goto finish;
        }
 
@@ -495,10 +612,7 @@ CQregister(Client cntxt, str alias, int 
        if( pnet == 0){
                pnew = (CQnode *) GDKzalloc((INITIAL_MAXCQ) * sizeof(CQnode));
                if( pnew == NULL) {
-                       msg = 
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
-                       GDKfree(alias);
-                       freeMalBlk(mb);
-                       goto unlock;
+                       FREE_CQ_MB(unlock)
                }
                pnetLimit = INITIAL_MAXCQ;
                pnet = pnew;
@@ -506,97 +620,66 @@ CQregister(Client cntxt, str alias, int 
        if (pnettop == pnetLimit) {
                pnew = (CQnode *) GDKrealloc(pnet, (pnetLimit+INITIAL_MAXCQ) * 
sizeof(CQnode));
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to