Changeset: 8204adf3f30e for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8204adf3f30e Modified Files: sql/backends/monet5/Tests/cquery16.sql sql/backends/monet5/Tests/cquery16.stable.err sql/backends/monet5/sql_cat.c sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_cquery.h sql/backends/monet5/sql_statement.c sql/backends/monet5/sqlcatalog.mal sql/include/sql_catalog.h sql/include/sql_relation.h sql/server/rel_psm.c Branch: trails Log Message:
Compile the UDF and setup parameters just before registering in the Petri-net. diffs (truncated from 685 to 300 lines): diff --git a/sql/backends/monet5/Tests/cquery16.sql b/sql/backends/monet5/Tests/cquery16.sql --- a/sql/backends/monet5/Tests/cquery16.sql +++ b/sql/backends/monet5/Tests/cquery16.sql @@ -4,11 +4,6 @@ create table results16 (a int); start continuous sys.cq_query16a(); --error -create procedure cq_query16b() --error -begin - start continuous sys.cq_query16a(); -end; - create procedure cq_query16a() begin insert into results16 (select * from testing16); diff --git a/sql/backends/monet5/Tests/cquery16.stable.err b/sql/backends/monet5/Tests/cquery16.stable.err --- a/sql/backends/monet5/Tests/cquery16.stable.err +++ b/sql/backends/monet5/Tests/cquery16.stable.err @@ -31,15 +31,8 @@ stderr of test 'cquery16` in directory ' MAPI = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794 QUERY = start continuous sys.cq_query16a(); --error -ERROR = !SELECT: no such operator 'cq_query16a' -CODE = 42000 -MAPI = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794 -QUERY = create procedure cq_query16b() --error - begin - start continuous sys.cq_query16a(); - end; -ERROR = !SELECT: no such operator 'cq_query16a' -CODE = 42000 +ERROR = !Failed to bind procedure sys.cq_query16a +CODE = 3F000 # 15:37:38 > # 15:37:38 > "Done." diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c --- a/sql/backends/monet5/sql_cat.c +++ b/sql/backends/monet5/sql_cat.c @@ -427,12 +427,6 @@ drop_index(Client cntxt, mvc *sql, char } static str -start_cp(Client cntxt, str alias, int action, lng heartbeat, lng startat, int cycles, MalBlkPtr fcall) -{ - return CQregister(cntxt, alias, action, heartbeat, startat, cycles, fcall); -} - -static str change_single_cp(str alias, int action, lng heartbeat, lng startat, int cycles) { if(action & mod_resume_continuous) { @@ -1312,15 +1306,18 @@ str SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { mvc *sql = NULL; str msg; - str alias = *getArgReference_str(stk, pci, 1); - int action = *getArgReference_int(stk, pci, 2); - lng heartbeat = *getArgReference_lng(stk, pci, 3); - lng startat = *getArgReference_lng(stk, pci, 4); - int cycles = *getArgReference_int(stk, pci, 5); - MalBlkPtr fcall = *(MalBlkPtr*) getArgReference(stk, pci, 6); + str sname = *getArgReference_str(stk, pci, 1); + str fname = *getArgReference_str(stk, pci, 2); + int argc = *getArgReference_int(stk, pci, 3); + atom **args = *(atom ***) getArgReference(stk, pci, 4); + str alias = *getArgReference_str(stk, pci, 5); + int action = *getArgReference_int(stk, pci, 6); + lng heartbeat = *getArgReference_lng(stk, pci, 7); + lng startat = *getArgReference_lng(stk, pci, 8); + int cycles = *getArgReference_int(stk, pci, 9); initcontext(); - msg = start_cp(cntxt, alias, action, heartbeat, startat, cycles, fcall); + msg = CQregister(cntxt, sname, fname, argc, args, alias, action, heartbeat, startat, cycles); return msg; } diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -58,6 +58,7 @@ static int pnstatus = CQINIT; static int cycleDelay = 200; /* be careful, it affects response/throughput timings */ static MT_Lock ttrLock; static MT_Id cq_pid = 0; +static int CQ_counter = 0; static BAT *CQ_id_tick = 0; static BAT *CQ_id_mod = 0; @@ -429,19 +430,37 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i return msg; } +#define FREE_CQ_MB(X) \ + msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); \ + if(mb) \ + freeMalBlk(mb); \ + if(ralias) \ + GDKfree(ralias); \ + goto X; + /* Every SQL statement is wrapped with a caller function that * regulates transaction bounds, debugger * The actual function is called with the arguments provided in the call. */ str -CQregister(Client cntxt, str alias, int which, lng heartbeats, lng startat, int cycles, MalBlkPtr mb) +CQregister(Client cntxt, str sname, str fname, int argc, atom **args, str alias, int which, lng heartbeats, lng startat, int cycles) { - str msg = MAL_SUCCEED; - InstrPtr sig = NULL, q; - Symbol s; + str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL; + InstrPtr p = NULL, q = NULL; + Symbol sym; CQnode *pnew; + MalBlkPtr mb = NULL, prev; const char* err_message = (which & mod_continuous_function) ? "function" : "procedure"; - int i, j, idx; + char* cq_id = NULL; + int i, idx, varid; + char buffer[IDLENGTH]; + backend* be = (backend*) cntxt->sqlcontext; + mvc *m = be->mvc; + sql_schema *s; + sql_subfunc *f; + list *l; + + prev = be->mb; if(cycles < 0 && cycles != CYCLES_NIL){ msg = createException(SQL,"cquery.register",SQLSTATE(42000) "The cycles value must be non negative\n"); @@ -456,34 +475,132 @@ CQregister(Client cntxt, str alias, int goto finish; } - if(which & mod_continuous_function){ /* for functions we need to remove the sql.mvc instruction */ - for(i = 1; i< mb->stop; i++){ - sig= getInstrPtr(mb,i); - if( getFunctionId(sig) == mvcRef){ - removeInstruction(mb, sig); - } + if (!m->sa) { + if((m->sa = sa_create()) == NULL) { + FREE_CQ_MB(finish) + } + } + if (!be->mb) { + if((be->mb = newMalBlk(8)) == NULL) { + FREE_CQ_MB(finish) } } - /* extract the actual procedure/function call and check for duplicate */ - for(i = 1; i< mb->stop; i++){ - sig= getInstrPtr(mb,i); - if( getModuleId(sig) == userRef) - break; - } - if( i == mb->stop){ - msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Cannot detect %s call %s.%s.\n", - err_message, getModuleId(sig), getFunctionId(sig)); + rschema = (sname == NULL || strcmp(sname, str_nil) == 0) ? m->session->schema_name : sname; + if((s = mvc_bind_schema(m, rschema)) == NULL) { //bind the schema + msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind schema %s\n", rschema); goto finish; } - if(!alias || strcmp(alias, str_nil) == 0) { - alias = GDKstrdup(getFunctionId(sig)); + if((l = list_create(NULL)) == NULL) { + FREE_CQ_MB(finish) + } + for (i = 0; i < argc; i++) { //prepare the arguments for the backend creation + atom *a = args[i]; + list_append(l, stmt_varnr(be, i, &a->tpe)); + } + if(argc) + f = sql_find_func(m->sa, s, fname, argc, (which & mod_continuous_function) ? F_FUNC : F_PROC, NULL); //bind the UDF + else + f = sql_bind_func_(m->sa, s, fname, l, (which & mod_continuous_function) ? F_FUNC : F_PROC); + if(!f) { + msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind %s %s.%s\n", err_message, sname, fname); + GDKfree(ralias); + list_destroy(l); + goto finish; + } + if (backend_create_subfunc(be, f, l) < 0) { //create the backend function + msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to generate backend function\n"); + GDKfree(ralias); + list_destroy(l); + goto finish; + } + list_destroy(l); + + (void) snprintf(buffer, sizeof(buffer), "cq_%d", ++CQ_counter); //set the CQ ID + if((cq_id = GDKstrdup(buffer)) == NULL) { + FREE_CQ_MB(finish) + } + if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it + GDKfree(cq_id); + FREE_CQ_MB(finish) + } + if((p = newInstruction(NULL, "user", cq_id)) == NULL) { + GDKfree(cq_id); + FREE_CQ_MB(finish) + } + p->token = FUNCTIONsymbol; + p->barrier = 0; + varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_any); + setDestVar(p, varid); + pushInstruction(mb, p); + + for (i = 0; i < argc; i++) { //add variables to the MAL block + atom *a = args[i]; + int type = atom_type(a)->type->localtype; + varid = 0; + + (void) snprintf(buffer, sizeof(buffer), "A%d", i); + a->varid = varid = newVariable(mb, buffer, strlen(buffer), type); + if (varid < 0) { + FREE_CQ_MB(finish) + } + if ((p = pushArgument(mb, p, varid)) == NULL) { + FREE_CQ_MB(finish) + } + setVarType(mb, varid, type); + setVarUDFtype(mb, 0); + } + for (i = 0; i < argc; i++) { //add assignments for arguments + p = newAssignment(mb); + if (p && args[i]->varid >= 0) { + p = pushArgument(mb, p, args[i]->varid); + } else if(p) { + (void) snprintf(buffer, sizeof(buffer), "A%d", i); + p = pushArgumentId(mb, p, buffer); + } + if (p == NULL) { + FREE_CQ_MB(finish) + } + } + if ((p = newStmt(mb, "user", fname)) == NULL) { //add the UDF call statement + FREE_CQ_MB(finish) + } + /*if (f->res && list_length(f->res)) { + sql_subtype *res = f->res->h->data; + setVarType(mb, getArg(q, 0), res->type->localtype); + setVarUDFtype(mb, getArg(q, 0)); + }*/ + for (i = 0; i < argc; i++) { //add arguments assignments + if ((p = pushArgument(mb, p, i + 2)) == NULL) { + FREE_CQ_MB(finish) + } + } + for (i = 0; i < argc; i++) { //initialize arguments assignments + atom *arg = args[i]; + ValPtr val = (ValPtr) &arg->data; + if (VALcopy(&mb->var[i + 2].value, val) == NULL) { + FREE_CQ_MB(finish) + } + setVarConstant(mb, i + 2); + setVarFixed(mb, i + 2); + } + + if(!alias || strcmp(alias, str_nil) == 0) { //set the alias + if((ralias = GDKstrdup(fname)) == NULL) { + FREE_CQ_MB(finish) + } } else { - alias = GDKstrdup(alias); + ralias = GDKstrdup(alias); + } + if(ralias == NULL) { + FREE_CQ_MB(finish) } - if( alias == NULL) { - msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); + if ((sym = findSymbol(cntxt->usermodule, "user", fname)) == NULL){ // access the actual procedure body + msg = createException(SQL,"cquery.register",SQLSTATE(3F000) "Cannot find %s user.%s.\n", err_message, fname); + GDKfree(cq_id); + GDKfree(ralias); + freeMalBlk(mb); goto finish; } @@ -495,10 +612,7 @@ CQregister(Client cntxt, str alias, int if( pnet == 0){ pnew = (CQnode *) GDKzalloc((INITIAL_MAXCQ) * sizeof(CQnode)); if( pnew == NULL) { - msg = createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL); - GDKfree(alias); - freeMalBlk(mb); - goto unlock; + FREE_CQ_MB(unlock) } pnetLimit = INITIAL_MAXCQ; pnet = pnew; @@ -506,97 +620,66 @@ CQregister(Client cntxt, str alias, int if (pnettop == pnetLimit) { pnew = (CQnode *) GDKrealloc(pnet, (pnetLimit+INITIAL_MAXCQ) * sizeof(CQnode)); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list