Changeset: 86a80eabe40a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=86a80eabe40a Added Files: sql/backends/monet5/Tests/pyapi18.sql sql/backends/monet5/Tests/pyapi18.stable.err sql/backends/monet5/Tests/pyapi18.stable.out Modified Files: monetdb5/extras/pyapi/connection.c monetdb5/extras/pyapi/pyapi.c monetdb5/extras/pyapi/pytypes.h sql/backends/monet5/Tests/All tools/embeddedpy/embeddedpy.c Branch: pyapi Log Message:
Added support for DECIMAL, DATE, TIME and TIMESTAMP SQL types. diffs (truncated from 777 to 300 lines): diff --git a/monetdb5/extras/pyapi/connection.c b/monetdb5/extras/pyapi/connection.c --- a/monetdb5/extras/pyapi/connection.c +++ b/monetdb5/extras/pyapi/connection.c @@ -50,7 +50,7 @@ static PyObject * input.bat_type = ATOMstorage(getColumnType(b->T->type)); input.scalar = false; - numpy_array = PyMaskedArray_FromBAT(&input, 0, input.count, &res); + numpy_array = PyMaskedArray_FromBAT(self->cntxt, &input, 0, input.count, &res); if (!numpy_array) { _connection_cleanup_result(output); PyErr_Format(PyExc_Exception, "SQL Query Failed: %s", (res ? res : "<no error>")); diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -39,6 +39,10 @@ #include "benchmark.h" #include "lazyarray.h" +#include "sql_scenario.h" +#include "sql_cast.h" +//#include "sql_cast_impl_up_to_flt.h" + #ifndef WIN32 // These libraries are used for PYTHON_MAP operations on Linux [to start new processes and wait on them] #include <sys/types.h> @@ -357,6 +361,11 @@ PyAPIevalAggrMap(Client cntxt, MalBlkPtr static char *PyError_CreateException(char *error_text, char *pycall); + +static enum _sqltype get_sql_token(sql_subtype *sqltype); +str ConvertFromSQLType(Client cntxt, BAT *b, enum _sqltype sqltype, sql_subtype *sql_subtype, BAT **ret_bat, int *ret_type); +str ConvertToSQLType(Client cntxt, BAT *b, enum _sqltype sqltype, sql_subtype *sql_subtype, BAT **ret_bat, int *ret_type); + //! The main PyAPI function, this function does everything PyAPI related //! It takes as argument a bunch of input BATs, a python function, and outputs a number of BATs //! This function follows the following pipeline @@ -396,6 +405,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb int retcols = !varres ? pci->retc : -1; bool holds_gil = !mapped; + (void) cntxt; if (!PyAPIEnabled()) { @@ -471,6 +481,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb } // Construct PyInput objects, we do this before any multiprocessing because there is some locking going on in there, and locking + forking = bad idea (a thread can fork while another process is in the lock, which means we can get stuck permanently) + argnode = sqlfun && sqlfun->ops->cnt > 0 ? sqlfun->ops->h : NULL; for (i = pci->retc + 2; i < pci->argc; i++) { PyInput *inp = &pyinput_values[i - (pci->retc + 2)]; @@ -497,7 +508,13 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb inp->bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i))); inp->bat = b; } + if (argnode) { + inp->sql_subtype = &((sql_arg*)argnode->data)->type; + + argnode = argnode->next; + } } + if (!mapped) { MT_lock_set(&pyapiLock, "pyapi.evaluate"); if (python_call_active) { @@ -705,7 +722,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb } else #endif { - result_array = PyMaskedArray_FromBAT(&pyinput_values[i - (pci->retc + 2)], t_start, t_end, &msg); + result_array = PyMaskedArray_FromBAT(cntxt, &pyinput_values[i - (pci->retc + 2)], t_start, t_end, &msg); } } if (result_array == NULL) { @@ -883,7 +900,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // WARNING: Because we could be converting to a NPY_STRING or NPY_UNICODE array (if the desired type is TYPE_str or TYPE_hge), this means that memory usage can explode // because NPY_STRING/NPY_UNICODE arrays are 2D string arrays with fixed string length (so if there's one very large string the size explodes quickly) // if someone has some problem with memory size exploding when using PYTHON_MAP but it being fine in regular PYTHON this is probably the issue - int bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i))); + int bat_type = getColumnType(getArgType(mb,pci,i)); PyObject *new_array = PyArray_FromAny(ret->numpy_array, PyArray_DescrFromType(BatType_ToPyType(bat_type)), 1, 1, NPY_ARRAY_CARRAY | NPY_ARRAY_FORCECAST, NULL); if (new_array == NULL) { msg = createException(MAL, "pyapi.eval", "Could not convert the returned NPY_OBJECT array to the desired array of type %s.\n", BatType_Format(bat_type)); @@ -947,10 +964,12 @@ returnvalues: /*[RETURN_VALUES]*/ VERBOSE_MESSAGE("Returning values.\n"); + argnode = sqlfun && sqlfun->res ? sqlfun->res->h : NULL; for (i = 0; i < retcols; i++) { PyReturn *ret = &pyreturn_values[i]; int bat_type = TYPE_any; + sql_subtype *sql_subtype = argnode ? &((sql_arg*)argnode->data)->type : NULL; if (!varres) { bat_type = getColumnType(getArgType(mb,pci,i)); @@ -962,7 +981,7 @@ returnvalues: bat_type = PyType_ToBat(ret->result_type); } - b = PyObject_ConvertToBAT(ret, bat_type, i, seqbase, &msg); + b = PyObject_ConvertToBAT(ret, sql_subtype, bat_type, i, seqbase, &msg); if (b == NULL) { goto wrapup; } @@ -976,6 +995,9 @@ returnvalues: { // single value return, only for non-grouped aggregations VALinit(&stk->stk[pci->argv[i]], bat_type, Tloc(b, BUNfirst(b))); } + if (argnode) { + argnode = argnode->next; + } msg = MAL_SUCCEED; } wrapup: @@ -1309,11 +1331,30 @@ wrapup: return vararray; } -PyObject *PyMaskedArray_FromBAT(PyInput *inp, size_t t_start, size_t t_end, char **return_message) +PyObject *PyMaskedArray_FromBAT(Client cntxt, PyInput *inp, size_t t_start, size_t t_end, char **return_message) { - BAT *b = inp->bat; + BAT *b; char *msg; - PyObject *vararray = PyArrayObject_FromBAT(inp, t_start, t_end, return_message); + PyObject *vararray; + enum _sqltype sql_type = get_sql_token(inp->sql_subtype); + + if (sql_type != sql_none) { // if the sql type is set, we have to do some conversion + if (inp->scalar) { + // todo: scalar SQL types + msg = PyError_CreateException("Scalar SQL types haven't been implemented yet... sorry", NULL); + goto wrapup; + } else { + BAT *ret_bat = NULL; + msg = ConvertFromSQLType(cntxt, inp->bat, sql_type, inp->sql_subtype, &ret_bat, &inp->bat_type); + if (msg != MAL_SUCCEED) { + goto wrapup; + } + inp->bat = ret_bat; + } + } + b = inp->bat; + + vararray = PyArrayObject_FromBAT(inp, t_start, t_end, return_message); if (vararray == NULL) { return NULL; } @@ -1601,7 +1642,11 @@ PyObject *PyArrayObject_FromBAT(PyInput } #endif default: - msg = createException(MAL, "pyapi.eval", "unknown argument type "); + if (!inp->sql_subtype || !inp->sql_subtype->type) { + msg = createException(MAL, "pyapi.eval", "unknown argument type"); + } else { + msg = createException(MAL, "pyapi.eval", "Unsupported SQL Type: %s", inp->sql_subtype->type->sqlname); + } goto wrapup; } if (vararray == NULL) { @@ -1931,14 +1976,31 @@ wrapup: return FALSE; } -BAT *PyObject_ConvertToBAT(PyReturn *ret, int bat_type, int i, int seqbase, char **return_message) +BAT *PyObject_ConvertToBAT(PyReturn *ret, sql_subtype *type, int bat_type, int i, int seqbase, char **return_message) { BAT *b = NULL; size_t index_offset = 0; char *msg; size_t iu; + int sql_token; if (ret->multidimensional) index_offset = i; + + sql_token = get_sql_token(type); + switch(sql_token) + { + case sql_timestamp: + case sql_time: + case sql_date: + bat_type = TYPE_str; + break; + case sql_decimal: + bat_type = TYPE_dbl; + break; + default: + break; + } + VERBOSE_MESSAGE("- Returning a Numpy Array of type %s of size %zu and storing it in a BAT of type %s\n", PyType_Format(ret->result_type), ret->count, BatType_Format(bat_type)); switch (bat_type) { @@ -2116,8 +2178,201 @@ BAT *PyObject_ConvertToBAT(PyReturn *ret msg = createException(MAL, "pyapi.eval", "Unrecognized BAT type %s.\n", BatType_Format(bat_type)); goto wrapup; } + + if (sql_token != sql_none) { + BAT *result; + msg = ConvertToSQLType(NULL, b, sql_token, type, &result, &bat_type); + if (msg != MAL_SUCCEED) { + goto wrapup; + } + b = result; + } + return b; wrapup: *return_message = msg; return NULL; } + + +str ConvertFromSQLType(Client cntxt, BAT *b, enum _sqltype sqltype, sql_subtype *sql_subtype, BAT **ret_bat, int *ret_type) +{ + str res = MAL_SUCCEED; + int conv_type; + + switch(sqltype) + { + case sql_timestamp: + case sql_time: + case sql_date: + conv_type = TYPE_str; + break; + case sql_decimal: + conv_type = TYPE_dbl; + break; + default: + return createException(MAL, "pyapi.eval", "Convert From SQL Type: Unrecognized SQL type %s (%d).", sql_subtype->type->sqlname, sqltype); + } + + if (conv_type == TYPE_str) + { + // maybe there's a more elegant way for obj->str conversion than calling this MAL function? probably not + int eclass = -1; + int d1 = 0; + int s1 = 0; + int has_tz = 0; + bat bid = b->batCacheid; + int digits = 0; + + int i; + int nvar = 7; // variables we need to fill in + MalBlkRecord mb; + MalStack* stk = NULL; + InstrRecord* pci = NULL; + + switch(sqltype) + { + case sql_date: + eclass = EC_DATE; + d1 = 0; + break; + case sql_time: + eclass = EC_TIME; + d1 = 1; + break; + case sql_timestamp: + eclass = EC_TIMESTAMP; + d1 = 7; + break; + default: + break; + } + + // very black MAL magic below + stk = GDKmalloc(sizeof(MalStack) + nvar * sizeof(ValRecord)); + pci = GDKmalloc(sizeof(InstrRecord) + nvar * sizeof(int)); + assert(stk != NULL && pci != NULL); // cough, cough + for (i = 0; i < nvar; i++) { + pci->argv[i] = i; + } + + stk->stk[0].vtype = TYPE_bat; + stk->stk[1].val.ival = eclass; + stk->stk[1].vtype = TYPE_int; + stk->stk[2].val.ival = d1; + stk->stk[2].vtype = TYPE_int; + stk->stk[3].val.ival = s1; + stk->stk[3].vtype = TYPE_int; + stk->stk[4].val.ival = has_tz; + stk->stk[4].vtype = TYPE_int; + stk->stk[5].val.bval = bid; + stk->stk[5].vtype = TYPE_bat; + stk->stk[6].val.ival = digits; + stk->stk[6].vtype = TYPE_int; + + res = SQLbatstr_cast(cntxt, &mb, stk, pci); + + if (res == MAL_SUCCEED) { + *ret_bat = BATdescriptor(stk->stk[0].val.bval); + *ret_type = TYPE_str; + } else { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list