Changeset: 9e50bda62d61 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/9e50bda62d61 Modified Files: monetdb5/extras/rapi/rapi.c sql/backends/monet5/UDF/pyapi3/pyapi3.c Branch: sequences_7184 Log Message:
Implement cardinality based bulk operations for rapi en pyapi3. diffs (truncated from 345 to 300 lines): diff --git a/monetdb5/extras/rapi/rapi.c b/monetdb5/extras/rapi/rapi.c --- a/monetdb5/extras/rapi/rapi.c +++ b/monetdb5/extras/rapi/rapi.c @@ -598,8 +598,6 @@ bailout: static str RAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit grouped) { sql_func * sqlfun = NULL; - str exprStr = *getArgReference_str(stk, pci, pci->retc + 1); - SEXP x, env, retval; SEXP varname = R_NilValue; SEXP varvalue = R_NilValue; @@ -621,6 +619,19 @@ static str RAPIeval(Client cntxt, MalBlk rapiClient = cntxt; + // If the first input argument is of type lng, this is a cardinality-only bulk operation. + int has_card_arg = 0; + lng card; // cardinality of non-bat inputs + if (getArgType(mb, pci, pci->retc) == TYPE_lng) { + has_card_arg=1; + card = *getArgReference_lng(stk, pci, pci->retc); + } + else { + has_card_arg=0; + card = 1; + } + str exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + has_card_arg); + if (!RAPIEnabled()) { throw(MAL, "rapi.eval", "Embedded R has not been enabled. Start server with --set %s=true", @@ -632,10 +643,10 @@ static str RAPIeval(Client cntxt, MalBlk } if (!grouped) { - sql_subfunc *sqlmorefun = (*(sql_subfunc**) getArgReference(stk, pci, pci->retc)); - if (sqlmorefun) sqlfun = (*(sql_subfunc**) getArgReference(stk, pci, pci->retc))->func; + sql_subfunc *sqlmorefun = (*(sql_subfunc**) getArgReference(stk, pci, pci->retc+has_card_arg)); + if (sqlmorefun) sqlfun = sqlmorefun->func; } else { - sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc); + sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc+has_card_arg); } args = (str*) GDKzalloc(sizeof(str) * pci->argc); @@ -653,7 +664,7 @@ static str RAPIeval(Client cntxt, MalBlk // NEW macro temporarily renamed to MNEW to allow including sql_catalog.h if (sqlfun != NULL && sqlfun->ops->cnt > 0) { - int carg = pci->retc + 2; + int carg = pci->retc + 2 + has_card_arg; argnode = sqlfun->ops->h; while (argnode) { char* argname = ((sql_arg*) argnode->data)->name; @@ -664,7 +675,7 @@ static str RAPIeval(Client cntxt, MalBlk } // the first unknown argument is the group, we don't really care for the rest. argnameslen = 2; - for (i = pci->retc + 2; i < pci->argc; i++) { + for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) { if (args[i] == NULL) { if (!seengrp && grouped) { args[i] = GDKstrdup("aggr_group"); @@ -679,30 +690,16 @@ static str RAPIeval(Client cntxt, MalBlk // install the MAL variables into the R environment // we can basically map values to int ("INTEGER") or double ("REAL") - for (i = pci->retc + 2; i < pci->argc; i++) { + for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) { int bat_type = getBatType(getArgType(mb,pci,i)); // check for BAT or scalar first, keep code left if (!isaBatType(getArgType(mb,pci,i))) { - b = COLnew(0, getArgType(mb, pci, i), 0, TRANSIENT); + const ValRecord *v = &stk->stk[getArg(pci, i)]; + b = BATconstant(0, v->vtype, VALptr(v), card, TRANSIENT); if (b == NULL) { msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL); goto wrapup; } - if ( getArgType(mb,pci,i) == TYPE_str) { - if (BUNappend(b, *getArgReference_str(stk, pci, i), false) != GDK_SUCCEED) { - BBPreclaim(b); - b = NULL; - msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL); - goto wrapup; - } - } else { - if (BUNappend(b, getArgReference(stk, pci, i), false) != GDK_SUCCEED) { - BBPreclaim(b); - b = NULL; - msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL); - goto wrapup; - } - } } else { b = BATdescriptor(*getArgReference_bat(stk, pci, i)); if (b == NULL) { @@ -751,7 +748,7 @@ static str RAPIeval(Client cntxt, MalBlk goto wrapup; } argnames[0] = '\0'; - for (i = pci->retc + 2; i < pci->argc; i++) { + for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) { pos += snprintf(argnames + pos, argnameslen - pos, "%s%s", args[i], i < pci->argc - 1 ? ", " : ""); } @@ -824,6 +821,7 @@ static str RAPIeval(Client cntxt, MalBlk "Failed to convert column %i", i); goto wrapup; } + // bat return if (isaBatType(getArgType(mb,pci,i))) { *getArgReference_bat(stk, pci, i) = b->batCacheid; @@ -940,6 +938,7 @@ static mel_func rapi_init_funcs[] = { pattern("rapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))), command("rapi", "prelude", RAPIprelude, false, "", args(1,1, arg("",void))), pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))), + pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("card", lng), arg("fptr",ptr),arg("expr",str))), pattern("batrapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))), pattern("batrapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))), { .imp=NULL } diff --git a/sql/backends/monet5/UDF/pyapi3/pyapi3.c b/sql/backends/monet5/UDF/pyapi3/pyapi3.c --- a/sql/backends/monet5/UDF/pyapi3/pyapi3.c +++ b/sql/backends/monet5/UDF/pyapi3/pyapi3.c @@ -200,16 +200,29 @@ static str PyAPIeval(Client cntxt, MalBl throw(MAL, "pyapi3.eval", SQLSTATE(PY000) "Embedded Python is enabled but an error was " "thrown during initialization."); } + + // If the first input argument is of type lng, this is a cardinality-only bulk operation. + int has_card_arg = 0; + lng card; // cardinality of non-bat inputs + if (getArgType(mb, pci, pci->retc) == TYPE_lng) { + has_card_arg=1; + card = *getArgReference_lng(stk, pci, pci->retc); + } + else { + has_card_arg=0; + card = 1; + } + if (!grouped) { sql_subfunc *sqlmorefun = - (*(sql_subfunc **)getArgReference(stk, pci, pci->retc)); + (*(sql_subfunc **)getArgReference(stk, pci, pci->retc + has_card_arg)); if (sqlmorefun) { sqlfun = sqlmorefun->func; } } else { - sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc); + sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc + has_card_arg); } - exprStr = *getArgReference_str(stk, pci, pci->retc + 1); + exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + has_card_arg); varres = sqlfun ? sqlfun->varres : 0; retcols = !varres ? pci->retc : -1; @@ -219,9 +232,9 @@ static str PyAPIeval(Client cntxt, MalBl throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " arguments."); } - if ((pci->argc - (pci->retc + 2)) * sizeof(PyInput) > 0) { + if ((pci->argc - (pci->retc + 2 + has_card_arg)) * sizeof(PyInput) > 0) { pyinput_values = - GDKzalloc((pci->argc - (pci->retc + 2)) * sizeof(PyInput)); + GDKzalloc((pci->argc - (pci->retc + 2 + has_card_arg)) * sizeof(PyInput)); if (pyinput_values == NULL) { GDKfree(args); @@ -232,7 +245,7 @@ static str PyAPIeval(Client cntxt, MalBl // Analyse the SQL_Func structure to get the parameter names if (sqlfun != NULL && sqlfun->ops->cnt > 0) { - unnamedArgs = pci->retc + 2; + unnamedArgs = pci->retc + 2 + has_card_arg; argnode = sqlfun->ops->h; while (argnode) { char *argname = ((sql_arg *)argnode->data)->name; @@ -250,14 +263,14 @@ static str PyAPIeval(Client cntxt, MalBl // We name all the unknown arguments, if grouping is enabled the first // unknown argument that is the group variable, we name this 'aggr_group' - for (i = pci->retc + 2; i < argcount; i++) { + for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) { if (args[i] == NULL) { if (!seengrp && grouped) { args[i] = GDKstrdup("aggr_group"); seengrp = TRUE; } else { char argbuf[64]; - snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1); + snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - (1 + has_card_arg)); args[i] = GDKstrdup(argbuf); } } @@ -268,24 +281,35 @@ static str PyAPIeval(Client cntxt, MalBl // (a thread can fork while another process is in the lock, which means we // can get stuck permanently) argnode = sqlfun && sqlfun->ops->cnt > 0 ? sqlfun->ops->h : NULL; - for (i = pci->retc + 2; i < argcount; i++) { - PyInput *inp = &pyinput_values[i - (pci->retc + 2)]; + for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) { + PyInput *inp = &pyinput_values[i - (pci->retc + 2 + has_card_arg)]; if (!isaBatType(getArgType(mb, pci, i))) { inp->scalar = true; inp->bat_type = getArgType(mb, pci, i); inp->count = 1; - if (inp->bat_type == TYPE_str) { - inp->dataptr = getArgReference_str(stk, pci, i); - } else { - inp->dataptr = getArgReference(stk, pci, i); + + if (!has_card_arg) { + if (inp->bat_type == TYPE_str) { + inp->dataptr = getArgReference_str(stk, pci, i); + } else { + inp->dataptr = getArgReference(stk, pci, i); + } + } + else { + const ValRecord *v = &stk->stk[getArg(pci, i)]; + b = BATconstant(0, v->vtype, VALptr(v), card, TRANSIENT); + if (b == NULL) { + msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL); + goto wrapup; + } + inp->count = BATcount(b); + inp->bat_type = b->ttype; + inp->bat = b; } } else { b = BATdescriptor(*getArgReference_bat(stk, pci, i)); if (b == NULL) { - msg = createException( - MAL, "pyapi3.eval", - SQLSTATE(PY000) "The BAT passed to the function (argument #%d) is NULL.\n", - i - (pci->retc + 2) + 1); + msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL); goto wrapup; } seqbase = b->hseqbase; @@ -620,7 +644,7 @@ static str PyAPIeval(Client cntxt, MalBl // arrays) // We will put the python arrays in a PyTuple object, we will use this // PyTuple object as the set of arguments to call the Python function - pArgs = PyTuple_New(argcount - (pci->retc + 2) + + pArgs = PyTuple_New(argcount - (pci->retc + 2 + has_card_arg) + (code_object == NULL ? additional_columns : 0)); pColumns = PyDict_New(); pColumnTypes = PyDict_New(); @@ -631,23 +655,23 @@ static str PyAPIeval(Client cntxt, MalBl #endif // Now we will loop over the input BATs and convert them to python objects - for (i = pci->retc + 2; i < argcount; i++) { + for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) { PyObject *result_array; // t_start and t_end hold the part of the BAT we will convert to a Numpy // array, by default these hold the entire BAT [0 - BATcount(b)] - size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 2)].count; + size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 2 + has_card_arg)].count; // There are two possibilities, either the input is a BAT, or the input // is a scalar // If the input is a scalar we will convert it to a python scalar // If the input is a BAT, we will convert it to a numpy array - if (pyinput_values[i - (pci->retc + 2)].scalar) { + if (pyinput_values[i - (pci->retc + 2 + has_card_arg)].scalar) { result_array = PyArrayObject_FromScalar( - &pyinput_values[i - (pci->retc + 2)], &msg); + &pyinput_values[i - (pci->retc + 2 + has_card_arg)], &msg); } else { - int type = pyinput_values[i - (pci->retc + 2)].bat_type; + int type = pyinput_values[i - (pci->retc + 2 + has_card_arg)].bat_type; result_array = PyMaskedArray_FromBAT( - &pyinput_values[i - (pci->retc + 2)], t_start, t_end, &msg, + &pyinput_values[i - (pci->retc + 2 + has_card_arg)], t_start, t_end, &msg, !enable_zerocopy_input && type != TYPE_void); } if (result_array == NULL) { @@ -659,12 +683,12 @@ static str PyAPIeval(Client cntxt, MalBl } if (code_object == NULL) { PyObject *arg_type = PyString_FromString( - BatType_Format(pyinput_values[i - (pci->retc + 2)].bat_type)); + BatType_Format(pyinput_values[i - (pci->retc + 2 + has_card_arg)].bat_type)); PyDict_SetItemString(pColumns, args[i], result_array); PyDict_SetItemString(pColumnTypes, args[i], arg_type); Py_DECREF(arg_type); } - pyinput_values[i - (pci->retc + 2)].result = result_array; + pyinput_values[i - (pci->retc + 2 + has_card_arg)].result = result_array; PyTuple_SetItem(pArgs, ai++, result_array); } if (code_object == NULL) { @@ -719,7 +743,7 @@ static str PyAPIeval(Client cntxt, MalBl size_t *group_counts = NULL; oid *aggr_group_arr = NULL; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list