Changeset: d38a839b1d0a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d38a839b1d0a Modified Files: monetdb5/extras/pyapi/pyapi.c monetdb5/extras/pyapi/pyapi.h monetdb5/extras/pyapi/pyapi.mal sql/backends/monet5/sql_gencode.c sql/include/sql_catalog.h sql/server/rel_psm.c sql/server/rel_select.c sql/server/rel_semantic.c sql/server/rel_updates.c sql/server/sql_mvc.h sql/server/sql_parser.h sql/server/sql_parser.y sql/server/sql_scan.c Branch: pythonloader Log Message:
first stage COPY INTO sometable FROM LOADER somepythonfunction(); diffs (truncated from 368 to 300 lines): diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -418,6 +418,12 @@ PyAPIevalAggr(Client cntxt, MalBlkPtr mb } str +PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + return PyAPIeval(cntxt, mb, stk, pci, 0, 0); +} + +str PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { return PyAPIeval(cntxt, mb, stk, pci, 1, 1); diff --git a/monetdb5/extras/pyapi/pyapi.h b/monetdb5/extras/pyapi/pyapi.h --- a/monetdb5/extras/pyapi/pyapi.h +++ b/monetdb5/extras/pyapi/pyapi.h @@ -96,6 +96,7 @@ pyapi_export str PyAPIevalStd(Client cnt pyapi_export str PyAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); pyapi_export str PyAPIevalStdMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); pyapi_export str PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +pyapi_export str PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); pyapi_export str PyAPIprelude(void *ret); diff --git a/monetdb5/extras/pyapi/pyapi.mal b/monetdb5/extras/pyapi/pyapi.mal --- a/monetdb5/extras/pyapi/pyapi.mal +++ b/monetdb5/extras/pyapi/pyapi.mal @@ -25,6 +25,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg: address PyAPIevalAggr comment "grouped aggregates through Python"; +pattern eval_loader(fptr:ptr,expr:str,arg:any...):any... +address PyAPIevalLoader +comment "loader functions through Python"; + # initializer code command prelude() :void address PyAPIprelude; pyapi.prelude(); @@ -44,6 +48,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg: address PyAPIevalAggr comment "grouped aggregates through Python"; +pattern eval_loader(fptr:ptr,expr:str,arg:any...):any... +address PyAPIevalLoader +comment "loader functions through Python"; + module pyapimap; # The generic Python interface diff --git a/sql/backends/monet5/sql_gencode.c b/sql/backends/monet5/sql_gencode.c --- a/sql/backends/monet5/sql_gencode.c +++ b/sql/backends/monet5/sql_gencode.c @@ -2135,7 +2135,7 @@ static int if (f->func->lang == FUNC_LANG_R || f->func->lang == FUNC_LANG_PY || f->func->lang == FUNC_LANG_MAP_PY) q = pushStr(mb, q, f->func->query); /* first dynamic output of copy* functions */ - if (f->func->type == F_UNION) + if (f->func->type == F_UNION || f->func->type == F_LOADER) q = table_func_create_result(mb, q, f->func, f->res); if (list_length(s->op1->op4.lval)) tpe = tail_type(s->op1->op4.lval->h->data); @@ -3038,6 +3038,10 @@ backend_create_py_func(backend *be, sql_ f->mod = "pyapi"; f->imp = "eval_aggr"; break; + case F_LOADER: + f->mod = "pyapi"; + f->imp = "eval_loader"; + break; case F_PROC: /* no output */ case F_FUNC: default: /* ie also F_FILT and F_UNION for now */ diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h --- a/sql/include/sql_catalog.h +++ b/sql/include/sql_catalog.h @@ -281,6 +281,7 @@ typedef struct sql_arg { #define F_FILT 4 #define F_UNION 5 #define F_ANALYTIC 6 +#define F_LOADER 7 #define IS_FUNC(f) (f->type == F_FUNC) #define IS_PROC(f) (f->type == F_PROC) @@ -288,6 +289,7 @@ typedef struct sql_arg { #define IS_FILT(f) (f->type == F_FILT) #define IS_UNION(f) (f->type == F_UNION) #define IS_ANALYTIC(f) (f->type == F_ANALYTIC) +#define IS_LOADER(f) (f->type == F_LOADER) #define FUNC_LANG_INT 0 /* internal */ #define FUNC_LANG_MAL 1 /* create sql external mod.func */ diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c --- a/sql/server/rel_psm.c +++ b/sql/server/rel_psm.c @@ -699,10 +699,12 @@ rel_create_func(mvc *sql, dlist *qname, char is_table = (res && res->token == SQL_TABLE); char is_aggr = (type == F_AGGR); char is_func = (type != F_PROC); - char *F = is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE"); + char is_loader = (type != F_LOADER); + + char *F = is_loader?"LOADER":(is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE")); char *KF = type==F_FILT?"FILTER ": type==F_UNION?"UNION ": ""; - assert(res || type == F_PROC || type == F_FILT); + assert(res || type == F_PROC || type == F_FILT || type == F_LOADER); if (is_table) type = F_UNION; @@ -784,18 +786,18 @@ rel_create_func(mvc *sql, dlist *qname, (lang == FUNC_LANG_MAP_PY)?"pyapimap":"unknown"; sql->params = NULL; if (create) { - f = mvc_create_func(sql, sql->sa, s, fname, l, restype, type, lang, mod, fname, lang_body, FALSE, vararg); + f = mvc_create_func(sql, sql->sa, s, fname, l, restype, type, lang, mod, fname, lang_body, (type == F_LOADER)?TRUE:FALSE, vararg); } else if (!sf) { return sql_error(sql, 01, "CREATE %s%s: R function %s.%s not bound", KF, F, s->base.name, fname ); - } else { + } /*else { sql_func *f = sf->func; f->mod = _STRDUP("rapi"); f->imp = _STRDUP("eval"); if (res && restype) f->res = restype; - f->sql = 0; /* native */ + f->sql = 0; f->lang = FUNC_LANG_INT; - } + }*/ } else if (body) { sql_arg *ra = (restype && !is_table)?restype->h->data:NULL; list *b = NULL; diff --git a/sql/server/rel_select.c b/sql/server/rel_select.c --- a/sql/server/rel_select.c +++ b/sql/server/rel_select.c @@ -602,7 +602,7 @@ rel_op_(mvc *sql, sql_schema *s, char *f f = sql_bind_func(sql->sa, s, fname, NULL, NULL, type); if (f && - ((ek.card == card_none && !f->res) || + ((ek.card == card_none && !f->res) || (ek.card != card_none && f->res))) { return exp_op(sql->sa, NULL, f); } else { diff --git a/sql/server/rel_semantic.c b/sql/server/rel_semantic.c --- a/sql/server/rel_semantic.c +++ b/sql/server/rel_semantic.c @@ -155,6 +155,7 @@ rel_semantic(mvc *sql, symbol *s) case SQL_DELETE: case SQL_COPYFROM: case SQL_BINCOPYFROM: + case SQL_COPYLOADER: case SQL_COPYTO: return rel_updates(sql, s); diff --git a/sql/server/rel_updates.c b/sql/server/rel_updates.c --- a/sql/server/rel_updates.c +++ b/sql/server/rel_updates.c @@ -15,6 +15,7 @@ #include "sql_privileges.h" #include "rel_optimizer.h" #include "rel_dump.h" +#include "rel_psm.h" #include "sql_symbol.h" static sql_exp * @@ -1452,6 +1453,86 @@ bincopyfrom(mvc *sql, dlist *qname, dlis return res; } + +static sql_rel * +copyfromloader(mvc *sql, dlist *qname, symbol *fcall) +{ + char *sname = qname_schema(qname); + char *tname = qname_table(qname); + + sql_schema *s = NULL; + sql_table *t = NULL; + sql_subtype tpe; + + node *n; + sql_rel *res; + list *exps, *args = NULL; + sql_exp *import; + dnode *l = fcall->data.lval->h; + char *fname = qname_fname(l->data.lval); + char *f_sname = qname_schema(l->data.lval); + sql_schema *f_s = sql->session->schema; + sql_subfunc *f = NULL; + + if (!copy_allowed(sql, 1)) { + (void) sql_error(sql, 02, "COPY INTO: insufficient privileges: " + "binary COPY INTO requires database administrator rights"); + return NULL; + } + + if (sname && !(s=mvc_bind_schema(sql, sname))) { + (void) sql_error(sql, 02, "3F000!COPY INTO: no such schema '%s'", sname); + return NULL; + } + if (!s) + s = cur_schema(sql); + t = mvc_bind_table(sql, s, tname); + if (!t && !sname) { + s = tmp_schema(sql); + t = mvc_bind_table(sql, s, tname); + if (!t) + t = stack_find_table(sql, tname); + } + if (insert_allowed(sql, t, tname, "COPY INTO", "copy into") == NULL) { + return NULL; + } + if (sname) { + f_s = mvc_bind_schema(sql, f_sname); + if (!f_s) { + (void) sql_error(sql, 02, "3F000!COPY INTO: no such schema '%s'", f_sname); + return NULL; + } + } + + // TODO: handle parameters to bind correct version + + f = sql_bind_func(sql->sa, f_s, fname, NULL, NULL, F_LOADER); + if (!f) { + (void) sql_error(sql, 02, "3F000!COPY INTO: no such loader function '%s'", fname); + return NULL; + } + f->res = table_column_types(sql->sa, t); + + sql_find_subtype(&tpe, "varchar", 0, 0); +// args = append( append( new_exp_list(sql->sa), +// exp_atom_str(sql->sa, t->s?t->s->base.name:NULL, &tpe)), +// exp_atom_str(sql->sa, t->base.name, &tpe)); + + import = exp_op(sql->sa, args, f); + if (!import) { + return NULL; + } + + exps = new_exp_list(sql->sa); + for (n = t->columns.set->h; n; n = n->next) { + sql_column *c = n->data; + append(exps, exp_column(sql->sa, t->base.name, c->base.name, &c->type, CARD_MULTI, c->null, 0)); + } + res = rel_table_func(sql->sa, NULL, import, exps, 1); + res = rel_insert_table(sql, t, t->base.name, res); + return res; +} + static sql_rel * rel_output(mvc *sql, sql_rel *l, sql_exp *sep, sql_exp *rsep, sql_exp *ssep, sql_exp *null_string, sql_exp *file) { @@ -1614,6 +1695,14 @@ rel_updates(mvc *sql, symbol *s) sql->type = Q_UPDATE; } break; + case SQL_COPYLOADER: + { + dlist *l = s->data.lval; + + ret = copyfromloader(sql, l->h->data.lval, l->h->next->data.sym); + sql->type = Q_UPDATE; + } + break; case SQL_COPYTO: { dlist *l = s->data.lval; diff --git a/sql/server/sql_mvc.h b/sql/server/sql_mvc.h --- a/sql/server/sql_mvc.h +++ b/sql/server/sql_mvc.h @@ -37,6 +37,8 @@ #define card_column 2 #define card_set 3 /* some operators require only a set (IN/EXISTS) */ #define card_relation 4 + + /* allowed to reduce (in the where and having parts we can reduce) */ /* different query execution modes (emode) */ diff --git a/sql/server/sql_parser.h b/sql/server/sql_parser.h --- a/sql/server/sql_parser.h +++ b/sql/server/sql_parser.h @@ -148,6 +148,7 @@ typedef enum tokens { SQL_ESCAPE, SQL_COPYFROM, SQL_BINCOPYFROM, + SQL_COPYLOADER, SQL_COPYTO, SQL_EXPORT, SQL_NEXT, diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y --- a/sql/server/sql_parser.y +++ b/sql/server/sql_parser.y @@ -583,7 +583,7 @@ SQLCODE SQLERROR UNDER WHENEVER %token TEMP TEMPORARY STREAM MERGE REMOTE REPLICA %token<sval> ASC DESC AUTHORIZATION _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list