Changeset: e7e8b6c61a3b for MonetDB
Modified Files:
Branch: default
Log Message:

handle unionfunctions more generaly now. Solves issues with correlated table 
returning functions
fixed problem with returning single values from psm functions where monetdb 
uses bats
too represent the single value.
added more checks for zero or one results

diffs (truncated from 751 to 300 lines):

diff --git a/clients/Tests/MAL-signatures.stable.out 
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -18886,6 +18886,7 @@ stdout of test 'MAL-signatures` in direc
 [ "sql",       "transaction_commit",   "pattern 
sql.transaction_commit(chain:int, name:str):void ",    
"SQLtransaction_commit;",       "A transaction statement (type can be 
commit,release,rollback or start)"        ]
 [ "sql",       "transaction_release",  "pattern 
sql.transaction_release(chain:int, name:str):void ",   
"SQLtransaction_release;",      "A transaction statement (type can be 
commit,release,rollback or start)"        ]
 [ "sql",       "transaction_rollback", "pattern 
sql.transaction_rollback(chain:int, name:str):void ",  
"SQLtransaction_rollback;",     "A transaction statement (type can be 
commit,release,rollback or start)"        ]
+[ "sql",       "unionfunc",    "pattern sql.unionfunc(mod:str, fcn:str, 
X_0:any...):any... ",  "SQLunionfunc;",        ""      ]
 [ "sql",       "update",       "pattern sql.update(mvc:int, sname:str, 
tname:str, cname:str, rids:any, upd:any):int ", "mvc_update_wrap;",     "Update 
the values of the column tname.cname. Returns sequence number for order 
dependence)"    ]
 [ "sql",       "update_schemas",       "pattern sql.update_schemas():void ",   
"SYSupdate_schemas;",   "Procedure triggered on update of the sys.schemas 
table"        ]
 [ "sql",       "update_tables",        "pattern sql.update_tables():void ",    
"SYSupdate_tables;",    "Procedure triggered on update of the sys._tables 
table"        ]
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -26900,6 +26900,7 @@ stdout of test 'MAL-signatures` in direc
 [ "sql",       "transaction_commit",   "pattern 
sql.transaction_commit(chain:int, name:str):void ",    
"SQLtransaction_commit;",       "A transaction statement (type can be 
commit,release,rollback or start)"        ]
 [ "sql",       "transaction_release",  "pattern 
sql.transaction_release(chain:int, name:str):void ",   
"SQLtransaction_release;",      "A transaction statement (type can be 
commit,release,rollback or start)"        ]
 [ "sql",       "transaction_rollback", "pattern 
sql.transaction_rollback(chain:int, name:str):void ",  
"SQLtransaction_rollback;",     "A transaction statement (type can be 
commit,release,rollback or start)"        ]
+[ "sql",       "unionfunc",    "pattern sql.unionfunc(mod:str, fcn:str, 
X_0:any...):any... ",  "SQLunionfunc;",        ""      ]
 [ "sql",       "update",       "pattern sql.update(mvc:int, sname:str, 
tname:str, cname:str, rids:any, upd:any):int ", "mvc_update_wrap;",     "Update 
the values of the column tname.cname. Returns sequence number for order 
dependence)"    ]
 [ "sql",       "update_schemas",       "pattern sql.update_schemas():void ",   
"SYSupdate_schemas;",   "Procedure triggered on update of the sys.schemas 
table"        ]
 [ "sql",       "update_tables",        "pattern sql.update_tables():void ",    
"SYSupdate_tables;",    "Procedure triggered on update of the sys._tables 
table"        ]
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -19,6 +19,8 @@
 #include "rel_optimizer.h"
 #include "sql_env.h"
 #include "sql_optimizer.h"
+#include "sql_gencode.h"
+#include "mal_builder.h"
 #define OUTER_ZERO 64
@@ -513,6 +515,10 @@ exp_bin(backend *be, sql_exp *e, stmt *l
                                        for(n=lst->op4.lval->h; n; n = n->next)
                                                list_append(l, const_column(be, 
                                        r = stmt_list(be, l);
+                               } else if (r->type == st_table && e->card == 
CARD_ATOM) { /* fetch value */
+                                       r = lst->op4.lval->h->data;
+                                       if (!r->aggr)
+                                               r = stmt_fetch(be, r);
                                if (r->type == st_list)
                                        r = stmt_table(be, r, 1);
@@ -1665,6 +1671,8 @@ rel2bin_table(backend *be, sql_rel *rel,
                int i;
                sql_subfunc *f = op->f;
                stmt *psub = NULL;
+               list *ops = NULL;
+               stmt *ids = NULL;
                if (rel->l) { /* first construct the sub relation */
                        sql_rel *l = rel->l;
@@ -1680,11 +1688,28 @@ rel2bin_table(backend *be, sql_rel *rel,
                                return NULL;
-               assert(f);
-               psub = exp_bin(be, op, sub, NULL, NULL, NULL, NULL, NULL); /* 
table function */
-               if (!psub) { 
-                       assert(sql->session->status == -10); /* Stack overflow 
errors shouldn't terminate the server */
-                       return NULL;
+               if (f->func->res && list_length(f->func->res) + 1 == 
list_length(rel->exps) && !f->func->varres) {
+                       /* add inputs in correct order ie loop through args of 
f and pass column */
+                       list *exps = op->l;
+                       ops = sa_list(be->mvc->sa);
+                       if (exps) {
+                               for (node *en = exps->h; en; en = en->next) {
+                                       sql_exp *e = en->data;
+                                       /* find column */
+                                       stmt *s = exp_bin(be, e, sub, NULL, 
+                                       if (en->next)
+                                               append(ops, s);
+                                       else /* last added exp is the ids (todo 
use name base lookup !!) */
+                                               ids = s;
+                               }
+                       }
+               } else {
+                       psub = exp_bin(be, op, sub, NULL, NULL, NULL, NULL, 
NULL); /* table function */
+                       if (!f || !psub) { 
+                               assert(sql->session->status == -10); /* Stack 
overflow errors shouldn't terminate the server */
+                               return NULL;
+                       }
                l = sa_list(sql->sa);
                if (f->func->res) {
@@ -1699,9 +1724,58 @@ rel2bin_table(backend *be, sql_rel *rel,
                                        list_append(l, s);
                        } else {
-                               assert(list_length(f->func->res) == 
-                               node *m;
-                               for(i = 0, n = f->func->res->h, m = 
rel->exps->h; n && m; n = n->next, m = m->next, i++ ) {
+                               node *m = rel->exps->h;
+                               int i = 0;
+                               /* correlated table returning function */
+                               if (list_length(f->func->res) + 1 == 
list_length(rel->exps)) {
+                                       /* use a simple nested loop solution 
for this case, ie 
+                                        * output a table of (input) row-ids, 
the output of the table producing function 
+                                        */
+                                       InstrPtr q = newStmt(be->mb, "sql", 
+                                       /* Generate output rowid column and 
output of function f */
+                                       for(i=0; m; m = m->next, i++) {
+                                               sql_exp *e = m->data;
+                                               int type = 
+                                               type = newBatType(type);
+                                               if (i)
+                                                       q = pushReturn(be->mb, 
q, newTmpVariable(be->mb, type));
+                                               else
+                                                       getArg(q, 0) = 
newTmpVariable(be->mb, type);
+                                       }
+                                       str mod = sql_func_mod(f->func);
+                                       str fcn = sql_func_imp(f->func);
+                                       q = pushStr(be->mb, q, mod);
+                                       q = pushStr(be->mb, q, fcn);
+                                       if (backend_create_func(be, f->func, 
NULL, ops) < 0)
+                                               return NULL;
+                                       psub = stmt_direct_func(be, q);
+                                       if (ids) /* push input rowids column */
+                                               q = pushArgument(be->mb, q, 
+                                       /* add inputs in correct order ie loop 
through args of f and pass column */
+                                       if (ops) {
+                                               for (node *en = ops->h; en; en 
= en->next) {
+                                                       stmt *op = en->data;
+                                                       q = 
pushArgument(be->mb, q, op->nr);
+                                               }
+                                       }
+                                       /* name output of dependent columns, 
output of function is handled the same as without correlation */
+                                       int len = 
+                                       assert(len== 1);
+                                       for(i=0, m=rel->exps->h; m && i<len; m 
= m->next, i++ ) {
+                                               sql_exp *exp = m->data;
+                                               stmt *s = stmt_rs_column(be, 
psub, i, exp_subtype(exp)); 
+                                               s = stmt_alias(be, s, exp->l, 
+                                               list_append(l, s);
+                                       }
+                               }
+                               for(n = f->func->res->h; n && m; n = n->next, m 
= m->next, i++ ) {
                                        sql_arg *a = n->data;
                                        sql_exp *exp = m->data;
                                        stmt *s = stmt_rs_column(be, psub, i, 
@@ -1710,7 +1784,9 @@ rel2bin_table(backend *be, sql_rel *rel,
                                        s = stmt_alias(be, s, rnme, a->name);
                                        list_append(l, s);
+#if 0
                                if (list_length(f->res) == 
list_length(f->func->res) + 1) {
+                                       assert(0);
                                        /* add missing %TID% column */
                                        sql_subtype *t = f->res->t->data;
                                        stmt *s = stmt_rs_column(be, psub, i, 
@@ -1719,6 +1795,7 @@ rel2bin_table(backend *be, sql_rel *rel,
                                        s = stmt_alias(be, s, rnme, TID);
                                        list_append(l, s);
                assert(rel->flag != TABLE_PROD_FUNC || !sub || !(sub->nrcols));
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -5553,3 +5553,123 @@ bailout:
        return msg;
+/* input id,row-input-values 
+ * for each id call function(with row-input-values) return table
+ * return for each id the table, ie id (*length of table) and table results
+ */
+SQLunionfunc(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+       int arg = pci->retc;
+       str mod, fcn, ret = MAL_SUCCEED;
+       InstrPtr npci;
+       mod = *getArgReference_str(stk, pci, arg++);
+       fcn = *getArgReference_str(stk, pci, arg++);
+       npci = newStmt(mb, mod, fcn);
+       for (int i = 1; i < pci->retc; i++) {
+               int type = getArgType(mb, pci, i);
+               if (i==1)
+                       getArg(npci, 0) = newTmpVariable(mb, type);
+               else
+                       npci = pushReturn(mb, npci, newTmpVariable(mb, type));
+       }
+       for (int i = pci->retc+2+1; i < pci->argc; i++) {
+               int type = getBatType(getArgType(mb, pci, i));
+               npci = pushNil(mb, npci, type);
+       }
+       /* check program to get the proper malblk */
+       if (chkInstruction(cntxt->usermodule, mb, npci))
+               return "ERORROR";
+       if (npci) {
+               BAT **res; 
+               BAT **input; 
+               BATiter *bi;
+               BUN cnt = 0;
+               int nrinput = pci->argc - 2 - pci->retc;
+               input = GDKmalloc(sizeof(BAT*) * nrinput);
+               bi = GDKmalloc(sizeof(BATiter) * nrinput);
+               assert(nrinput == pci->retc);
+               for (int i = 0, j = pci->retc+2; j < pci->argc; i++, j++) {
+                       bat *b = getArgReference_bat(stk, pci, j);
+                       input[i] = BATdescriptor(*b); 
+                       bi[i] = bat_iterator(input[i]);
+                       cnt = BATcount(input[i]); 
+               }
+               /* create result bats */
+               res = GDKmalloc(sizeof(BAT*) * pci->retc);
+               for (int i = 0; i<pci->retc; i++) {
+                       int type = getArgType(mb, pci, i);
+                       res[i] = COLnew(0, getBatType(type), cnt, TRANSIENT);
+               }
+               MalBlkPtr nmb = copyMalBlk(npci->blk);
+               MalStkPtr env = prepareMALstack(nmb, nmb->vsize); /* needed for 
result */
+               InstrPtr q = getInstrPtr(nmb, 0);
+               for(BUN cur = 0; cur<cnt && !ret; cur++ ) {
+                       MalStkPtr nstk = prepareMALstack(nmb, nmb->vsize);
+                       int i,ii;
+                       /* copy (input) arguments onto destination stack, 
skipping rowid col */
+                       for (i = 1, ii = q->retc; ii < q->argc; ii++) {
+                               ValPtr lhs = &nstk->stk[q->argv[ii]];
+                               ptr rhs = (ptr)BUNtail(bi[i], cur);
+                               assert(lhs->vtype != TYPE_bat);
+                               if (VALset(lhs, input[i]->ttype, rhs) == NULL) {
+                                       ret = createException(MAL, 
"mal.interpreter", MAL_MALLOC_FAIL);
+                                       break;
+                               }
+                       }
+                       if (ret == MAL_SUCCEED && ii == q->argc) {
+                               BAT *fres = NULL;
+                               BUN ccnt = 0;
+                               ret = runMALsequence(cntxt, nmb, 1, nmb->stop, 
nstk, env /* copy result in nstk first instruction*/, q); 
+                               /* insert into result */
+                               fres = 
+                               ccnt = BATcount(fres);
+                               BAT *p = NULL;
+                               if (BATappend(res[0], p = 
BATconstant(fres->hseqbase, res[0]->ttype, (ptr)BUNtail(bi[0], cur), ccnt, 0), 
+                                       ret = createException(MAL, 
"mal.interpreter", MAL_MALLOC_FAIL);
+                               BBPunfix(p->batCacheid);
+                               BBPunfix(fres->batCacheid);
+                               i=1;
+                               for (ii = 0; i < pci->retc && !ret; i++) {
+                                       BAT *b = 
+                                       if (BATappend(res[i], b, NULL, FALSE) 
+                                               ret = createException(MAL, 
"mal.interpreter", MAL_MALLOC_FAIL);
+                                       BBPrelease(b->batCacheid); /* release 
ref from env stack */
+                                       BBPunfix(b->batCacheid);   /* free 
pointer */
+                               }
+                       }
+                       GDKfree(nstk);
+               }
+               GDKfree(env);
+               freeMalBlk(nmb);
+               for (int i = 0; i<pci->retc; i++) {
+                       bat *b = getArgReference_bat(stk, pci, i);
+                       *b = res[i]->batCacheid;
+                       BBPkeepref(*b);
+               }
+               GDKfree(res);
+               for (int i = 0; i<nrinput; i++)
+                       BBPunfix(input[i]->batCacheid);
+               GDKfree(input);
+               GDKfree(bi);
+       }
+       return ret;
diff --git a/sql/backends/monet5/sql.h b/sql/backends/monet5/sql.h
--- a/sql/backends/monet5/sql.h
+++ b/sql/backends/monet5/sql.h
@@ -314,4 +314,6 @@ sql5_export str SQLhot_snapshot(void *re
 sql5_export str SQLsession_prepared_statements(Client cntxt, MalBlkPtr mb, 
MalStkPtr stk, InstrPtr pci);
checkin-list mailing list

Reply via email to